diff --git a/src/dbnode/persist/fs/seek_manager_test.go b/src/dbnode/persist/fs/seek_manager_test.go index ab190f1065..8d27e2cb63 100644 --- a/src/dbnode/persist/fs/seek_manager_test.go +++ b/src/dbnode/persist/fs/seek_manager_test.go @@ -457,7 +457,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) { var ( wg sync.WaitGroup mockSeekerStatsLock sync.Mutex - numMockSeekerClosesByShardAndBlockStart = make(map[uint32]map[time.Time]int) + numMockSeekerClosesByShardAndBlockStart = make(map[uint32]map[xtime.UnixNano]int) ) m.newOpenSeekerFn = func( shard uint32, @@ -476,10 +476,10 @@ func TestSeekerManagerAssignShardSet(t *testing.T) { mockSeekerStatsLock.Lock() numMockSeekerClosesByBlockStart, ok := numMockSeekerClosesByShardAndBlockStart[shard] if !ok { - numMockSeekerClosesByBlockStart = make(map[time.Time]int) + numMockSeekerClosesByBlockStart = make(map[xtime.UnixNano]int) numMockSeekerClosesByShardAndBlockStart[shard] = numMockSeekerClosesByBlockStart } - numMockSeekerClosesByBlockStart[blockStart]++ + numMockSeekerClosesByBlockStart[xtime.ToUnixNano(blockStart)]++ mockSeekerStatsLock.Unlock() wg.Done() return nil @@ -526,7 +526,9 @@ func TestSeekerManagerAssignShardSet(t *testing.T) { mockSeekerStatsLock.Lock() for _, numMockSeekerClosesByBlockStart := range numMockSeekerClosesByShardAndBlockStart { - require.Equal(t, defaultTestingFetchConcurrency, numMockSeekerClosesByBlockStart[blockStart]) + require.Equal(t, + defaultTestingFetchConcurrency, + numMockSeekerClosesByBlockStart[xtime.ToUnixNano(blockStart)]) } mockSeekerStatsLock.Unlock() @@ -548,7 +550,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) { mockSeekerStatsLock.Lock() for _, numMockSeekerClosesByBlockStart := range numMockSeekerClosesByShardAndBlockStart { for start, numMockSeekerCloses := range numMockSeekerClosesByBlockStart { - if blockStart == start { + if xtime.ToUnixNano(blockStart) == start { // NB(bodu): These get closed twice since they've been closed once already due to updating their block lease. require.Equal(t, defaultTestingFetchConcurrency*2, numMockSeekerCloses) continue diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index b673c0f6a2..924bdaac00 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -334,7 +334,7 @@ func (s *fileSystemSource) bootstrapFromReaders( for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, // it is not thread safe and requires reset after every processed index block. - builder.Builder().Reset(0) + builder.Builder().Reset() s.loadShardReadersDataIntoShardResult(run, ns, accumulator, runOpts, runResult, resultOpts, timeWindowReaders, readerPool, builder) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index d978b1c9f6..c4b59095fd 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -145,7 +145,7 @@ func writeTSDBPersistedIndexBlock( shards map[uint32]struct{}, block []testSeries, ) { - seg, err := mem.NewSegment(0, mem.NewOptions()) + seg, err := mem.NewSegment(mem.NewOptions()) require.NoError(t, err) for _, series := range block { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 585c4903ba..d76efed0df 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -712,7 +712,7 @@ func (s *peersSource) readIndex( for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, // it is not thread safe and requires reset after every processed index block. - builder.Builder().Reset(0) + builder.Builder().Reset() // NB(bodu): This is fetching the data for all shards for a block of time. remainingRanges, timesWithErrors := s.processReaders( diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 178f474738..52407b8ffc 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1168,7 +1168,7 @@ func (i *nsIndex) flushBlockSegment( builder segment.DocumentsBuilder, ) error { // Reset the builder - builder.Reset(0) + builder.Reset() var ( batch = m3ninxindex.Batch{AllowPartialUpdates: true} @@ -1895,10 +1895,10 @@ func (i *nsIndex) CleanupDuplicateFileSets() error { fsOpts.InfoReaderBufferSize(), ) - segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart := make(map[time.Time]map[idxpersist.IndexVolumeType][]fs.Segments) + segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart := make(map[xtime.UnixNano]map[idxpersist.IndexVolumeType][]fs.Segments) for _, file := range infoFiles { seg := fs.NewSegments(file.Info, file.ID.VolumeIndex, file.AbsoluteFilePaths) - blockStart := seg.BlockStart() + blockStart := xtime.ToUnixNano(seg.BlockStart()) segmentsOrderByVolumeIndexByVolumeType, ok := segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart[blockStart] if !ok { segmentsOrderByVolumeIndexByVolumeType = make(map[idxpersist.IndexVolumeType][]fs.Segments) diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 4a3f86da34..5bb078b676 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -2117,7 +2117,7 @@ func assertAggregateResultsMapEquals(t *testing.T, expected map[string][]string, } func testSegment(t *testing.T, docs ...doc.Document) segment.Segment { - seg, err := mem.NewSegment(0, testOpts.MemSegmentOptions()) + seg, err := mem.NewSegment(testOpts.MemSegmentOptions()) require.NoError(t, err) for _, d := range docs { diff --git a/src/dbnode/storage/index/compaction/compactor.go b/src/dbnode/storage/index/compaction/compactor.go index 8fc53a7f6c..288d63ef35 100644 --- a/src/dbnode/storage/index/compaction/compactor.go +++ b/src/dbnode/storage/index/compaction/compactor.go @@ -114,7 +114,7 @@ func (c *Compactor) Compact( return nil, errCompactorClosed } - c.builder.Reset(0) + c.builder.Reset() if err := c.builder.AddSegments(segs); err != nil { return nil, err } @@ -236,7 +236,7 @@ func (c *Compactor) compactFromBuilderWithLock( // Release resources regardless of result, // otherwise old compacted segments are held onto // strongly - builder.Reset(0) + builder.Reset() }() // Since this builder is likely reused between compaction @@ -275,7 +275,7 @@ func (c *Compactor) compactFromBuilderWithLock( // rather than encoding them and mmap'ing the encoded documents. allDocsCopy := make([]doc.Document, len(allDocs)) copy(allDocsCopy, allDocs) - fstData.DocsReader = docs.NewSliceReader(0, allDocsCopy) + fstData.DocsReader = docs.NewSliceReader(allDocsCopy) } else { // Otherwise encode and reference the encoded bytes as mmap'd bytes. c.buff.Reset() diff --git a/src/dbnode/storage/index/compaction/compactor_test.go b/src/dbnode/storage/index/compaction/compactor_test.go index 405bcad4b3..7a40e12b7b 100644 --- a/src/dbnode/storage/index/compaction/compactor_test.go +++ b/src/dbnode/storage/index/compaction/compactor_test.go @@ -81,7 +81,7 @@ func init() { } func TestCompactorSingleMutableSegment(t *testing.T) { - seg, err := mem.NewSegment(0, testMemSegmentOptions) + seg, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg.Insert(testDocuments[0]) @@ -105,7 +105,7 @@ func TestCompactorSingleMutableSegment(t *testing.T) { } func TestCompactorSingleMutableSegmentWithMmapDocsData(t *testing.T) { - seg, err := mem.NewSegment(0, testMemSegmentOptions) + seg, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg.Insert(testDocuments[0]) @@ -131,13 +131,13 @@ func TestCompactorSingleMutableSegmentWithMmapDocsData(t *testing.T) { } func TestCompactorManySegments(t *testing.T) { - seg1, err := mem.NewSegment(0, testMemSegmentOptions) + seg1, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg1.Insert(testDocuments[0]) require.NoError(t, err) - seg2, err := mem.NewSegment(0, testMemSegmentOptions) + seg2, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg2.Insert(testDocuments[1]) @@ -159,13 +159,13 @@ func TestCompactorManySegments(t *testing.T) { } func TestCompactorCompactDuplicateIDsNoError(t *testing.T) { - seg1, err := mem.NewSegment(0, testMemSegmentOptions) + seg1, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg1.Insert(testDocuments[0]) require.NoError(t, err) - seg2, err := mem.NewSegment(0, testMemSegmentOptions) + seg2, err := mem.NewSegment(testMemSegmentOptions) require.NoError(t, err) _, err = seg2.Insert(testDocuments[0]) diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index fbe3eb743f..b1ae28c8dd 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -222,7 +222,7 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, } - seg, err := mem.NewSegment(0, mem.NewOptions()) + seg, err := mem.NewSegment(mem.NewOptions()) require.NoError(t, err) require.NoError(t, seg.InsertBatch(m3ninxindex.Batch{ diff --git a/src/dbnode/storage/index/mutable_segments.go b/src/dbnode/storage/index/mutable_segments.go index 2fcc0ea1da..baa904cd7b 100644 --- a/src/dbnode/storage/index/mutable_segments.go +++ b/src/dbnode/storage/index/mutable_segments.go @@ -171,7 +171,7 @@ func (m *mutableSegments) WriteBatch(inserts *WriteBatch) error { m.Unlock() }() - builder.Reset(0) + builder.Reset() insertResultErr := builder.InsertBatch(m3ninxindex.Batch{ Docs: inserts.PendingDocs(), AllowPartialUpdates: true, @@ -621,7 +621,7 @@ func (m *mutableSegments) foregroundCompactWithBuilder(builder segment.Documents return errForegroundCompactorBadPlanSecondaryTask } // Now use the builder after resetting it. - builder.Reset(0) + builder.Reset() if err := m.foregroundCompactWithTask( builder, task, log, logger.With(zap.Int("task", i)), diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 2edc23fd0a..ac1cd597e5 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -177,8 +177,6 @@ type builder struct { opts Options newUUIDFn util.NewUUIDFn - offset postings.ID - batchSizeOne index.Batch docs []doc.Document idSet *IDsMap @@ -286,12 +284,10 @@ func (b *builder) IndexConcurrency() int { return b.concurrency } -func (b *builder) Reset(offset postings.ID) { +func (b *builder) Reset() { b.status.Lock() defer b.status.Unlock() - b.offset = offset - // Reset the documents slice. var empty doc.Document for i := range b.docs { @@ -485,8 +481,7 @@ func (b *builder) AllDocs() (index.IDDocIterator, error) { b.status.RLock() defer b.status.RUnlock() - rangeIter := postings.NewRangeIterator(b.offset, - b.offset+postings.ID(len(b.docs))) + rangeIter := postings.NewRangeIterator(0, postings.ID(len(b.docs))) return index.NewIDDocIterator(b, rangeIter), nil } @@ -494,7 +489,7 @@ func (b *builder) Doc(id postings.ID) (doc.Document, error) { b.status.RLock() defer b.status.RUnlock() - idx := int(id - b.offset) + idx := int(id) if idx < 0 || idx >= len(b.docs) { return doc.Document{}, errDocNotFound } diff --git a/src/m3ninx/index/segment/builder/builder_test.go b/src/m3ninx/index/segment/builder/builder_test.go index 0d496bd32a..2a62ae45c1 100644 --- a/src/m3ninx/index/segment/builder/builder_test.go +++ b/src/m3ninx/index/segment/builder/builder_test.go @@ -85,7 +85,7 @@ func TestBuilderFields(t *testing.T) { }() for i := 0; i < 10; i++ { - builder.Reset(0) + builder.Reset() knownsFields := map[string]struct{}{} for _, d := range testDocuments { @@ -115,7 +115,7 @@ func TestBuilderTerms(t *testing.T) { }() for i := 0; i < 10; i++ { - builder.Reset(0) + builder.Reset() knownsFields := map[string]map[string]struct{}{} for _, d := range testDocuments { diff --git a/src/m3ninx/index/segment/builder/multi_segments_builder.go b/src/m3ninx/index/segment/builder/multi_segments_builder.go index 9dc41e013c..459b93f524 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_builder.go +++ b/src/m3ninx/index/segment/builder/multi_segments_builder.go @@ -36,7 +36,6 @@ type builderFromSegments struct { idSet *IDsMap segments []segmentMetadata termsIter *termsIterFromSegments - offset postings.ID segmentsOffset postings.ID } @@ -60,9 +59,7 @@ func NewBuilderFromSegments(opts Options) segment.SegmentsBuilder { } } -func (b *builderFromSegments) Reset(offset postings.ID) { - b.offset = offset - +func (b *builderFromSegments) Reset() { // Reset the documents slice var emptyDoc doc.Document for i := range b.docs { @@ -152,13 +149,12 @@ func (b *builderFromSegments) Docs() []doc.Document { } func (b *builderFromSegments) AllDocs() (index.IDDocIterator, error) { - rangeIter := postings.NewRangeIterator(b.offset, - b.offset+postings.ID(len(b.docs))) + rangeIter := postings.NewRangeIterator(0, postings.ID(len(b.docs))) return index.NewIDDocIterator(b, rangeIter), nil } func (b *builderFromSegments) Doc(id postings.ID) (doc.Document, error) { - idx := int(id - b.offset) + idx := int(id) if idx < 0 || idx >= len(b.docs) { return doc.Document{}, errDocNotFound } diff --git a/src/m3ninx/index/segment/builder/multi_segments_field_iter_test.go b/src/m3ninx/index/segment/builder/multi_segments_field_iter_test.go index 31b07c59e5..f7fc85d2a2 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_field_iter_test.go +++ b/src/m3ninx/index/segment/builder/multi_segments_field_iter_test.go @@ -153,7 +153,7 @@ func newTestSegmentWithDocs( t *testing.T, docs []doc.Document, ) segment.Segment { - seg, err := mem.NewSegment(0, testMemOptions) + seg, err := mem.NewSegment(testMemOptions) require.NoError(t, err) defer func() { diff --git a/src/m3ninx/index/segment/builder/multi_segments_field_postings_list_iter_test.go b/src/m3ninx/index/segment/builder/multi_segments_field_postings_list_iter_test.go index 54b3cf020a..651a4ed346 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_field_postings_list_iter_test.go +++ b/src/m3ninx/index/segment/builder/multi_segments_field_postings_list_iter_test.go @@ -104,7 +104,7 @@ func TestFieldPostingsListIterFromSegments(t *testing.T) { }), } builder := NewBuilderFromSegments(testOptions) - builder.Reset(0) + builder.Reset() b, ok := builder.(*builderFromSegments) require.True(t, ok) diff --git a/src/m3ninx/index/segment/builder/multi_segments_terms_iter_test.go b/src/m3ninx/index/segment/builder/multi_segments_terms_iter_test.go index 0adb11fed5..65219d12c5 100644 --- a/src/m3ninx/index/segment/builder/multi_segments_terms_iter_test.go +++ b/src/m3ninx/index/segment/builder/multi_segments_terms_iter_test.go @@ -80,7 +80,7 @@ func TestTermsIterFromSegmentsDeduplicates(t *testing.T) { } builder := NewBuilderFromSegments(testOptions) - builder.Reset(0) + builder.Reset() require.NoError(t, builder.AddSegments(segments)) iter, err := builder.Terms([]byte("fruit")) require.NoError(t, err) diff --git a/src/m3ninx/index/segment/fst/encoding/docs/slice.go b/src/m3ninx/index/segment/fst/encoding/docs/slice.go index ce4f6daa03..ccf7fd48b6 100644 --- a/src/m3ninx/index/segment/fst/encoding/docs/slice.go +++ b/src/m3ninx/index/segment/fst/encoding/docs/slice.go @@ -32,23 +32,18 @@ var ( errDocNotFound = errors.New("doc not found") ) +var _ Reader = (*SliceReader)(nil) var _ index.DocRetriever = (*SliceReader)(nil) // SliceReader is a docs slice reader for use with documents // stored in memory. type SliceReader struct { - offset postings.ID - docs []doc.Document + docs []doc.Document } // NewSliceReader returns a new docs slice reader. -func NewSliceReader(offset postings.ID, docs []doc.Document) *SliceReader { - return &SliceReader{offset: offset, docs: docs} -} - -// Base returns the postings ID base offset of the slice reader. -func (r *SliceReader) Base() postings.ID { - return r.offset +func NewSliceReader(docs []doc.Document) *SliceReader { + return &SliceReader{docs: docs} } // Len returns the number of documents in the slice reader. @@ -58,7 +53,7 @@ func (r *SliceReader) Len() int { // Read returns a document from the docs slice reader. func (r *SliceReader) Read(id postings.ID) (doc.Document, error) { - idx := int(id - r.offset) + idx := int(id) if idx < 0 || idx >= len(r.docs) { return doc.Document{}, errDocNotFound } @@ -71,9 +66,8 @@ func (r *SliceReader) Doc(id postings.ID) (doc.Document, error) { return r.Read(id) } -// AllDocs returns a docs iterator. -func (r *SliceReader) AllDocs() index.IDDocIterator { - postingsIter := postings.NewRangeIterator(r.offset, - r.offset+postings.ID(r.Len())) +// Iter returns a docs iterator. +func (r *SliceReader) Iter() index.IDDocIterator { + postingsIter := postings.NewRangeIterator(0, postings.ID(r.Len())) return index.NewIDDocIterator(r, postingsIter) } diff --git a/src/m3ninx/index/segment/fst/encoding/docs/types.go b/src/m3ninx/index/segment/fst/encoding/docs/types.go new file mode 100644 index 0000000000..cf531d2608 --- /dev/null +++ b/src/m3ninx/index/segment/fst/encoding/docs/types.go @@ -0,0 +1,37 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package docs + +import ( + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/postings" +) + +// Reader is a document reader from an encoded source. +type Reader interface { + // Len is the number of documents contained by the reader. + Len() int + // Read reads a document with the given postings ID. + Read(id postings.ID) (doc.Document, error) + // Iter returns a document iterator. + Iter() index.IDDocIterator +} diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 0a1a5240d5..9940e9ef47 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -72,7 +72,7 @@ type SegmentData struct { // the docs data and docs idx data if the documents // already reside in memory and we want to use the // in memory references instead. - DocsReader *docs.SliceReader + DocsReader docs.Reader Closer io.Closer } @@ -131,38 +131,27 @@ func NewSegment(data SegmentData, opts Options) (Segment, error) { } var ( - docsSliceReader = data.DocsReader - docsDataReader *docs.DataReader - docsIndexReader *docs.IndexReader - startInclusive postings.ID - endExclusive postings.ID + docsThirdPartyReader = data.DocsReader + docsDataReader *docs.DataReader + docsIndexReader *docs.IndexReader ) - if docsSliceReader != nil { - startInclusive = docsSliceReader.Base() - endExclusive = startInclusive + postings.ID(docsSliceReader.Len()) - } else { + if docsThirdPartyReader == nil { docsDataReader = docs.NewDataReader(data.DocsData.Bytes) docsIndexReader, err = docs.NewIndexReader(data.DocsIdxData.Bytes) if err != nil { return nil, fmt.Errorf("unable to load documents index: %v", err) } - - // NB(jeromefroe): Currently we assume the postings IDs are contiguous. - startInclusive = docsIndexReader.Base() - endExclusive = startInclusive + postings.ID(docsIndexReader.Len()) } s := &fsSegment{ - fieldsFST: fieldsFST, - docsDataReader: docsDataReader, - docsIndexReader: docsIndexReader, - docsSliceReader: docsSliceReader, + fieldsFST: fieldsFST, + docsDataReader: docsDataReader, + docsIndexReader: docsIndexReader, + docsThirdPartyReader: docsThirdPartyReader, - data: data, - opts: opts, - numDocs: metadata.NumDocs, - startInclusive: startInclusive, - endExclusive: endExclusive, + data: data, + opts: opts, + numDocs: metadata.NumDocs, } // NB(r): The segment uses the context finalization to finalize @@ -180,19 +169,17 @@ var _ segment.ImmutableSegment = (*fsSegment)(nil) type fsSegment struct { sync.RWMutex - ctx context.Context - closed bool - finalized bool - fieldsFST *vellum.FST - docsDataReader *docs.DataReader - docsIndexReader *docs.IndexReader - docsSliceReader *docs.SliceReader - data SegmentData - opts Options - - numDocs int64 - startInclusive postings.ID - endExclusive postings.ID + ctx context.Context + closed bool + finalized bool + fieldsFST *vellum.FST + docsDataReader *docs.DataReader + docsIndexReader *docs.IndexReader + docsThirdPartyReader docs.Reader + data SegmentData + opts Options + + numDocs int64 } func (r *fsSegment) SegmentData(ctx context.Context) (SegmentData, error) { @@ -623,7 +610,7 @@ func (r *fsSegment) matchAllNotClosedMaybeFinalizedWithRLock() (postings.Mutable } pl := r.opts.PostingsListPool().Get() - err := pl.AddRange(r.startInclusive, r.endExclusive) + err := pl.AddRange(0, postings.ID(r.numDocs)) if err != nil { return nil, err } @@ -648,8 +635,8 @@ func (r *fsSegment) docNotClosedMaybeFinalizedWithRLock(id postings.ID) (doc.Doc } // If using docs slice reader, return from the in memory slice reader - if r.docsSliceReader != nil { - return r.docsSliceReader.Read(id) + if r.docsThirdPartyReader != nil { + return r.docsThirdPartyReader.Read(id) } offset, err := r.docsIndexReader.Read(id) @@ -700,7 +687,7 @@ func (r *fsSegment) allDocsNotClosedMaybeFinalizedWithRLock( return nil, errReaderFinalized } - pi := postings.NewRangeIterator(r.startInclusive, r.endExclusive) + pi := postings.NewRangeIterator(0, postings.ID(r.numDocs)) return index.NewIDDocIterator(retriever, pi), nil } diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index 68ea96780e..5fe5a60a36 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -579,7 +579,7 @@ func newTestSegments(t *testing.T, docs []doc.Document) (memSeg sgmt.MutableSegm func newTestMemSegment(t *testing.T) sgmt.MutableSegment { opts := mem.NewOptions() - s, err := mem.NewSegment(postings.ID(0), opts) + s, err := mem.NewSegment(opts) require.NoError(t, err) return s } diff --git a/src/m3ninx/index/segment/mem/merge_test.go b/src/m3ninx/index/segment/mem/merge_test.go index 06f32cb3e2..0a89b41112 100644 --- a/src/m3ninx/index/segment/mem/merge_test.go +++ b/src/m3ninx/index/segment/mem/merge_test.go @@ -25,7 +25,6 @@ import ( "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index" - "github.com/m3db/m3/src/m3ninx/postings" "github.com/stretchr/testify/require" ) @@ -76,19 +75,19 @@ func TestMemSegmentMerge(t *testing.T) { rest := docs[1:] opts := NewOptions() - m1, err := NewSegment(postings.ID(0), opts) + m1, err := NewSegment(opts) require.NoError(t, err) _, err = m1.Insert(d) require.NoError(t, err) - m2, err := NewSegment(postings.ID(0), opts) + m2, err := NewSegment(opts) require.NoError(t, err) for _, d := range rest { _, err = m2.Insert(d) require.NoError(t, err) } - m3, err := NewSegment(postings.ID(0), opts) + m3, err := NewSegment(opts) require.NoError(t, err) require.NoError(t, Merge(m3, m1, m2)) diff --git a/src/m3ninx/index/segment/mem/segment.go b/src/m3ninx/index/segment/mem/segment.go index 33aedd7268..9a45d1b2ec 100644 --- a/src/m3ninx/index/segment/mem/segment.go +++ b/src/m3ninx/index/segment/mem/segment.go @@ -68,19 +68,18 @@ type memSegment struct { // NewSegment returns a new in-memory mutable segment. It will start assigning // postings IDs at the provided offset. -func NewSegment(offset postings.ID, opts Options) (segment.MutableSegment, error) { +func NewSegment(opts Options) (segment.MutableSegment, error) { s := &memSegment{ - offset: int(offset), plPool: opts.PostingsListPool(), newUUIDFn: opts.NewUUIDFn(), termsDict: newTermsDict(opts), - readerID: postings.NewAtomicID(offset), + readerID: postings.NewAtomicID(0), } s.docs.data = make([]doc.Document, opts.InitialCapacity()) s.writer.idSet = newIDsMap(256) - s.writer.nextID = offset + s.writer.nextID = 0 return s, nil } @@ -92,15 +91,14 @@ func (s *memSegment) IndexConcurrency() int { return 1 } -func (s *memSegment) Reset(offset postings.ID) { +func (s *memSegment) Reset() { s.state.Lock() defer s.state.Unlock() s.state.sealed = false - s.offset = int(offset) s.termsDict.Reset() - s.readerID = postings.NewAtomicID(offset) + s.readerID = postings.NewAtomicID(0) var empty doc.Document for i := range s.docs.data { @@ -109,20 +107,13 @@ func (s *memSegment) Reset(offset postings.ID) { s.docs.data = s.docs.data[:0] s.writer.idSet.Reset() - s.writer.nextID = offset -} - -func (s *memSegment) Offset() postings.ID { - s.state.RLock() - offset := postings.ID(s.offset) - s.state.RUnlock() - return offset + s.writer.nextID = 0 } func (s *memSegment) Size() int64 { s.state.RLock() closed := s.state.closed - size := int64(s.readerID.Load()) - int64(s.offset) + size := int64(s.readerID.Load()) s.state.RUnlock() if closed { return 0 @@ -326,7 +317,7 @@ func (s *memSegment) indexDocWithStateLock(id postings.ID, d doc.Document) error // storeDocWithStateLock stores a documents into the segment's mapping of postings // IDs to documents. It must be called with the segment's state lock. func (s *memSegment) storeDocWithStateLock(id postings.ID, d doc.Document) { - idx := int(id) - s.offset + idx := int(id) // Can return early if we have sufficient capacity. { @@ -371,7 +362,7 @@ func (s *memSegment) Reader() (segment.Reader, error) { } limits := readerDocRange{ - startInclusive: postings.ID(s.offset), + startInclusive: postings.ID(0), endExclusive: s.readerID.Load(), } return newReader(s, limits, s.plPool), nil @@ -412,7 +403,7 @@ func (s *memSegment) getDoc(id postings.ID) (doc.Document, error) { return doc.Document{}, segment.ErrClosed } - idx := int(id) - s.offset + idx := int(id) s.docs.RLock() if idx >= len(s.docs.data) { diff --git a/src/m3ninx/index/segment/mem/segment_bench_test.go b/src/m3ninx/index/segment/mem/segment_bench_test.go index f3f7f8dbd3..ee344f17a9 100644 --- a/src/m3ninx/index/segment/mem/segment_bench_test.go +++ b/src/m3ninx/index/segment/mem/segment_bench_test.go @@ -70,7 +70,7 @@ func benchmarkInsertSegment(docs []doc.Document, b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() - s, err := NewSegment(0, NewOptions()) + s, err := NewSegment(NewOptions()) if err != nil { b.Fatalf("unable to construct new segment: %v", err) } @@ -85,7 +85,7 @@ func benchmarkInsertSegment(docs []doc.Document, b *testing.B) { func benchmarkMatchTermSegment(docs []doc.Document, b *testing.B) { b.ReportAllocs() - sgmt, err := NewSegment(0, NewOptions()) + sgmt, err := NewSegment(NewOptions()) if err != nil { b.Fatalf("unable to construct new segment: %v", err) } @@ -107,7 +107,7 @@ func benchmarkMatchTermSegment(docs []doc.Document, b *testing.B) { func benchmarkMatchRegexSegment(docs []doc.Document, b *testing.B) { b.ReportAllocs() - sgmt, err := NewSegment(0, NewOptions()) + sgmt, err := NewSegment(NewOptions()) if err != nil { b.Fatalf("unable to construct new segment: %v", err) } diff --git a/src/m3ninx/index/segment/mem/segment_test.go b/src/m3ninx/index/segment/mem/segment_test.go index 5612d55446..60137213ca 100644 --- a/src/m3ninx/index/segment/mem/segment_test.go +++ b/src/m3ninx/index/segment/mem/segment_test.go @@ -107,7 +107,7 @@ func TestSegmentInsert(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.Equal(t, int64(0), segment.Size()) @@ -169,7 +169,7 @@ func TestSegmentInsertDuplicateID(t *testing.T) { } ) - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.Equal(t, int64(0), segment.Size()) @@ -245,7 +245,7 @@ func TestSegmentInsertBatch(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.Equal(t, int64(0), segment.Size()) @@ -306,7 +306,7 @@ func TestSegmentInsertBatchError(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.Equal(t, int64(0), segment.Size()) require.NoError(t, err) @@ -394,7 +394,7 @@ func TestSegmentInsertBatchPartialError(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.Equal(t, int64(0), segment.Size()) @@ -460,7 +460,7 @@ func TestSegmentInsertBatchPartialErrorInvalidDoc(t *testing.T) { }, index.AllowPartialUpdates(), ) - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) err = segment.InsertBatch(b1) @@ -514,7 +514,7 @@ func TestSegmentContainsID(t *testing.T) { }, index.AllowPartialUpdates(), ) - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) ok, err := segment.ContainsID([]byte("abc")) require.NoError(t, err) @@ -573,7 +573,7 @@ func TestSegmentContainsField(t *testing.T) { }, } b1 := index.NewBatch(docs, index.AllowPartialUpdates()) - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) err = segment.InsertBatch(b1) @@ -642,7 +642,7 @@ func TestSegmentInsertBatchPartialErrorAlreadyIndexing(t *testing.T) { }, index.AllowPartialUpdates()) - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) err = segment.InsertBatch(b1) @@ -697,7 +697,7 @@ func TestSegmentReaderMatchExact(t *testing.T) { }, } - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) for _, doc := range docs { @@ -736,7 +736,7 @@ func TestSegmentReaderMatchExact(t *testing.T) { } func TestSegmentSealLifecycle(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) err = segment.Seal() @@ -747,7 +747,7 @@ func TestSegmentSealLifecycle(t *testing.T) { } func TestSegmentSealCloseLifecycle(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.NoError(t, segment.Close()) @@ -756,7 +756,7 @@ func TestSegmentSealCloseLifecycle(t *testing.T) { } func TestSegmentIsSealed(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) require.False(t, segment.IsSealed()) @@ -770,7 +770,7 @@ func TestSegmentIsSealed(t *testing.T) { } func TestSegmentFields(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) knownsFields := map[string]struct{}{} @@ -796,7 +796,7 @@ func TestSegmentFields(t *testing.T) { } func TestSegmentTerms(t *testing.T) { - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) knownsFields := map[string]map[string]struct{}{} @@ -829,7 +829,7 @@ func TestSegmentTerms(t *testing.T) { func TestSegmentReaderMatchRegex(t *testing.T) { docs := testDocuments - segment, err := NewSegment(0, testOptions) + segment, err := NewSegment(testOptions) require.NoError(t, err) for _, doc := range docs { diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 7a42de8c76..4721f195c2 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -997,15 +997,15 @@ func (mr *MockMutableSegmentMockRecorder) Terms(field interface{}) *gomock.Call } // Reset mocks base method -func (m *MockMutableSegment) Reset(offset postings.ID) { +func (m *MockMutableSegment) Reset() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", offset) + m.ctrl.Call(m, "Reset") } // Reset indicates an expected call of Reset -func (mr *MockMutableSegmentMockRecorder) Reset(offset interface{}) *gomock.Call { +func (mr *MockMutableSegmentMockRecorder) Reset() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockMutableSegment)(nil).Reset), offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockMutableSegment)(nil).Reset)) } // Docs mocks base method @@ -1107,20 +1107,6 @@ func (mr *MockMutableSegmentMockRecorder) Fields() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fields", reflect.TypeOf((*MockMutableSegment)(nil).Fields)) } -// Offset mocks base method -func (m *MockMutableSegment) Offset() postings.ID { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Offset") - ret0, _ := ret[0].(postings.ID) - return ret0 -} - -// Offset indicates an expected call of Offset -func (mr *MockMutableSegmentMockRecorder) Offset() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Offset", reflect.TypeOf((*MockMutableSegment)(nil).Offset)) -} - // Seal mocks base method func (m *MockMutableSegment) Seal() error { m.ctrl.T.Helper() @@ -1341,15 +1327,15 @@ func (mr *MockBuilderMockRecorder) Terms(field interface{}) *gomock.Call { } // Reset mocks base method -func (m *MockBuilder) Reset(offset postings.ID) { +func (m *MockBuilder) Reset() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", offset) + m.ctrl.Call(m, "Reset") } // Reset indicates an expected call of Reset -func (mr *MockBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { +func (mr *MockBuilderMockRecorder) Reset() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockBuilder)(nil).Reset), offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockBuilder)(nil).Reset)) } // Docs mocks base method @@ -1435,15 +1421,15 @@ func (mr *MockDocumentsBuilderMockRecorder) Terms(field interface{}) *gomock.Cal } // Reset mocks base method -func (m *MockDocumentsBuilder) Reset(offset postings.ID) { +func (m *MockDocumentsBuilder) Reset() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", offset) + m.ctrl.Call(m, "Reset") } // Reset indicates an expected call of Reset -func (mr *MockDocumentsBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { +func (mr *MockDocumentsBuilderMockRecorder) Reset() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockDocumentsBuilder)(nil).Reset), offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockDocumentsBuilder)(nil).Reset)) } // Docs mocks base method @@ -1584,15 +1570,15 @@ func (mr *MockCloseableDocumentsBuilderMockRecorder) Terms(field interface{}) *g } // Reset mocks base method -func (m *MockCloseableDocumentsBuilder) Reset(offset postings.ID) { +func (m *MockCloseableDocumentsBuilder) Reset() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", offset) + m.ctrl.Call(m, "Reset") } // Reset indicates an expected call of Reset -func (mr *MockCloseableDocumentsBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { +func (mr *MockCloseableDocumentsBuilderMockRecorder) Reset() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Reset), offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockCloseableDocumentsBuilder)(nil).Reset)) } // Docs mocks base method @@ -1747,15 +1733,15 @@ func (mr *MockSegmentsBuilderMockRecorder) Terms(field interface{}) *gomock.Call } // Reset mocks base method -func (m *MockSegmentsBuilder) Reset(offset postings.ID) { +func (m *MockSegmentsBuilder) Reset() { m.ctrl.T.Helper() - m.ctrl.Call(m, "Reset", offset) + m.ctrl.Call(m, "Reset") } // Reset indicates an expected call of Reset -func (mr *MockSegmentsBuilderMockRecorder) Reset(offset interface{}) *gomock.Call { +func (mr *MockSegmentsBuilderMockRecorder) Reset() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockSegmentsBuilder)(nil).Reset), offset) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reset", reflect.TypeOf((*MockSegmentsBuilder)(nil).Reset)) } // Docs mocks base method diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index d39a5f106c..566a25bd00 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -166,9 +166,6 @@ type MutableSegment interface { // builder by inserting more documents. Fields() (FieldsIterator, error) - // Offset returns the postings offset. - Offset() postings.ID - // Seal marks the Mutable Segment immutable. Seal() error @@ -189,7 +186,7 @@ type Builder interface { TermsIterable // Reset resets the builder for reuse. - Reset(offset postings.ID) + Reset() // Docs returns the current docs slice, this is not safe to modify // and is invalidated on a call to reset. diff --git a/src/m3ninx/persist/writer.go b/src/m3ninx/persist/writer.go index f709c8be24..23bab30e0a 100644 --- a/src/m3ninx/persist/writer.go +++ b/src/m3ninx/persist/writer.go @@ -186,7 +186,7 @@ func (w *fstSegmentDataWriter) WriteFile(fileType IndexSegmentFileType, iow io.W func (w *fstSegmentDataWriter) writeDocsData(iow io.Writer) error { if r := w.data.DocsReader; r != nil { - iter := r.AllDocs() + iter := r.Iter() closer := x.NewSafeCloser(iter) defer closer.Close() w.docsWriter.Reset(fst.DocumentsWriterOptions{ diff --git a/src/m3ninx/search/proptest/segment_gen.go b/src/m3ninx/search/proptest/segment_gen.go index 842c3d7059..dbc6ad9e43 100644 --- a/src/m3ninx/search/proptest/segment_gen.go +++ b/src/m3ninx/search/proptest/segment_gen.go @@ -29,7 +29,6 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" - "github.com/m3db/m3/src/m3ninx/postings" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" @@ -56,7 +55,7 @@ func collectDocs(iter doc.Iterator) ([]doc.Document, error) { func newTestMemSegment(t *testing.T, docs []doc.Document) segment.MutableSegment { opts := mem.NewOptions() - s, err := mem.NewSegment(postings.ID(0), opts) + s, err := mem.NewSegment(opts) require.NoError(t, err) for _, d := range docs { _, err := s.Insert(d) @@ -68,8 +67,7 @@ func newTestMemSegment(t *testing.T, docs []doc.Document) segment.MutableSegment func (i propTestInput) generate(t *testing.T, docs []doc.Document) []segment.Segment { var result []segment.Segment for j := 0; j < len(i.segments); j++ { - initialOffset := postings.ID(i.segments[j].initialDocIDOffset) - s, err := mem.NewSegment(initialOffset, memOptions) + s, err := mem.NewSegment(memOptions) require.NoError(t, err) for k := 0; k < len(i.docIds[j]); k++ { idx := i.docIds[j][k] @@ -145,8 +143,7 @@ func genPropTestInput(numDocs int) gopter.Gen { func genSegment() gopter.Gen { return gopter.CombineGens( - gen.Bool(), // simple segment - gen.IntRange(1, 5), // initial doc id offset + gen.Bool(), // simple segment ).Map(func(val interface{}) generatedSegment { var inputs []interface{} if x, ok := val.(*gopter.GenResult); ok { @@ -159,15 +156,13 @@ func genSegment() gopter.Gen { inputs = val.([]interface{}) } return generatedSegment{ - simpleSegment: inputs[0].(bool), - initialDocIDOffset: inputs[1].(int), + simpleSegment: inputs[0].(bool), } }) } type generatedSegment struct { - simpleSegment bool - initialDocIDOffset int + simpleSegment bool } type randomDocIds []int