Skip to content

Commit

Permalink
[dbnode] Remove unused index offset and remove bad map[time.Time] usa…
Browse files Browse the repository at this point in the history
…ge (#2537)
  • Loading branch information
robskillington authored Sep 3, 2020
1 parent c036ebf commit 225a0c4
Show file tree
Hide file tree
Showing 28 changed files with 169 additions and 190 deletions.
12 changes: 7 additions & 5 deletions src/dbnode/persist/fs/seek_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {
var (
wg sync.WaitGroup
mockSeekerStatsLock sync.Mutex
numMockSeekerClosesByShardAndBlockStart = make(map[uint32]map[time.Time]int)
numMockSeekerClosesByShardAndBlockStart = make(map[uint32]map[xtime.UnixNano]int)
)
m.newOpenSeekerFn = func(
shard uint32,
Expand All @@ -476,10 +476,10 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {
mockSeekerStatsLock.Lock()
numMockSeekerClosesByBlockStart, ok := numMockSeekerClosesByShardAndBlockStart[shard]
if !ok {
numMockSeekerClosesByBlockStart = make(map[time.Time]int)
numMockSeekerClosesByBlockStart = make(map[xtime.UnixNano]int)
numMockSeekerClosesByShardAndBlockStart[shard] = numMockSeekerClosesByBlockStart
}
numMockSeekerClosesByBlockStart[blockStart]++
numMockSeekerClosesByBlockStart[xtime.ToUnixNano(blockStart)]++
mockSeekerStatsLock.Unlock()
wg.Done()
return nil
Expand Down Expand Up @@ -526,7 +526,9 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {

mockSeekerStatsLock.Lock()
for _, numMockSeekerClosesByBlockStart := range numMockSeekerClosesByShardAndBlockStart {
require.Equal(t, defaultTestingFetchConcurrency, numMockSeekerClosesByBlockStart[blockStart])
require.Equal(t,
defaultTestingFetchConcurrency,
numMockSeekerClosesByBlockStart[xtime.ToUnixNano(blockStart)])
}
mockSeekerStatsLock.Unlock()

Expand All @@ -548,7 +550,7 @@ func TestSeekerManagerAssignShardSet(t *testing.T) {
mockSeekerStatsLock.Lock()
for _, numMockSeekerClosesByBlockStart := range numMockSeekerClosesByShardAndBlockStart {
for start, numMockSeekerCloses := range numMockSeekerClosesByBlockStart {
if blockStart == start {
if xtime.ToUnixNano(blockStart) == start {
// NB(bodu): These get closed twice since they've been closed once already due to updating their block lease.
require.Equal(t, defaultTestingFetchConcurrency*2, numMockSeekerCloses)
continue
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (s *fileSystemSource) bootstrapFromReaders(
for timeWindowReaders := range readersCh {
// NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks,
// it is not thread safe and requires reset after every processed index block.
builder.Builder().Reset(0)
builder.Builder().Reset()

s.loadShardReadersDataIntoShardResult(run, ns, accumulator,
runOpts, runResult, resultOpts, timeWindowReaders, readerPool, builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func writeTSDBPersistedIndexBlock(
shards map[uint32]struct{},
block []testSeries,
) {
seg, err := mem.NewSegment(0, mem.NewOptions())
seg, err := mem.NewSegment(mem.NewOptions())
require.NoError(t, err)

for _, series := range block {
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap/bootstrapper/peers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func (s *peersSource) readIndex(
for timeWindowReaders := range readersCh {
// NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks,
// it is not thread safe and requires reset after every processed index block.
builder.Builder().Reset(0)
builder.Builder().Reset()

// NB(bodu): This is fetching the data for all shards for a block of time.
remainingRanges, timesWithErrors := s.processReaders(
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (i *nsIndex) flushBlockSegment(
builder segment.DocumentsBuilder,
) error {
// Reset the builder
builder.Reset(0)
builder.Reset()

var (
batch = m3ninxindex.Batch{AllowPartialUpdates: true}
Expand Down Expand Up @@ -1895,10 +1895,10 @@ func (i *nsIndex) CleanupDuplicateFileSets() error {
fsOpts.InfoReaderBufferSize(),
)

segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart := make(map[time.Time]map[idxpersist.IndexVolumeType][]fs.Segments)
segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart := make(map[xtime.UnixNano]map[idxpersist.IndexVolumeType][]fs.Segments)
for _, file := range infoFiles {
seg := fs.NewSegments(file.Info, file.ID.VolumeIndex, file.AbsoluteFilePaths)
blockStart := seg.BlockStart()
blockStart := xtime.ToUnixNano(seg.BlockStart())
segmentsOrderByVolumeIndexByVolumeType, ok := segmentsOrderByVolumeIndexByVolumeTypeAndBlockStart[blockStart]
if !ok {
segmentsOrderByVolumeIndexByVolumeType = make(map[idxpersist.IndexVolumeType][]fs.Segments)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/index/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2117,7 +2117,7 @@ func assertAggregateResultsMapEquals(t *testing.T, expected map[string][]string,
}

func testSegment(t *testing.T, docs ...doc.Document) segment.Segment {
seg, err := mem.NewSegment(0, testOpts.MemSegmentOptions())
seg, err := mem.NewSegment(testOpts.MemSegmentOptions())
require.NoError(t, err)

for _, d := range docs {
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/index/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Compactor) Compact(
return nil, errCompactorClosed
}

c.builder.Reset(0)
c.builder.Reset()
if err := c.builder.AddSegments(segs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (c *Compactor) compactFromBuilderWithLock(
// Release resources regardless of result,
// otherwise old compacted segments are held onto
// strongly
builder.Reset(0)
builder.Reset()
}()

// Since this builder is likely reused between compaction
Expand Down Expand Up @@ -275,7 +275,7 @@ func (c *Compactor) compactFromBuilderWithLock(
// rather than encoding them and mmap'ing the encoded documents.
allDocsCopy := make([]doc.Document, len(allDocs))
copy(allDocsCopy, allDocs)
fstData.DocsReader = docs.NewSliceReader(0, allDocsCopy)
fstData.DocsReader = docs.NewSliceReader(allDocsCopy)
} else {
// Otherwise encode and reference the encoded bytes as mmap'd bytes.
c.buff.Reset()
Expand Down
12 changes: 6 additions & 6 deletions src/dbnode/storage/index/compaction/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func init() {
}

func TestCompactorSingleMutableSegment(t *testing.T) {
seg, err := mem.NewSegment(0, testMemSegmentOptions)
seg, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg.Insert(testDocuments[0])
Expand All @@ -105,7 +105,7 @@ func TestCompactorSingleMutableSegment(t *testing.T) {
}

func TestCompactorSingleMutableSegmentWithMmapDocsData(t *testing.T) {
seg, err := mem.NewSegment(0, testMemSegmentOptions)
seg, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg.Insert(testDocuments[0])
Expand All @@ -131,13 +131,13 @@ func TestCompactorSingleMutableSegmentWithMmapDocsData(t *testing.T) {
}

func TestCompactorManySegments(t *testing.T) {
seg1, err := mem.NewSegment(0, testMemSegmentOptions)
seg1, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg1.Insert(testDocuments[0])
require.NoError(t, err)

seg2, err := mem.NewSegment(0, testMemSegmentOptions)
seg2, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg2.Insert(testDocuments[1])
Expand All @@ -159,13 +159,13 @@ func TestCompactorManySegments(t *testing.T) {
}

func TestCompactorCompactDuplicateIDsNoError(t *testing.T) {
seg1, err := mem.NewSegment(0, testMemSegmentOptions)
seg1, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg1.Insert(testDocuments[0])
require.NoError(t, err)

seg2, err := mem.NewSegment(0, testMemSegmentOptions)
seg2, err := mem.NewSegment(testMemSegmentOptions)
require.NoError(t, err)

_, err = seg2.Insert(testDocuments[0])
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/index/fields_terms_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) {
},
}

seg, err := mem.NewSegment(0, mem.NewOptions())
seg, err := mem.NewSegment(mem.NewOptions())
require.NoError(t, err)

require.NoError(t, seg.InsertBatch(m3ninxindex.Batch{
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *mutableSegments) WriteBatch(inserts *WriteBatch) error {
m.Unlock()
}()

builder.Reset(0)
builder.Reset()
insertResultErr := builder.InsertBatch(m3ninxindex.Batch{
Docs: inserts.PendingDocs(),
AllowPartialUpdates: true,
Expand Down Expand Up @@ -621,7 +621,7 @@ func (m *mutableSegments) foregroundCompactWithBuilder(builder segment.Documents
return errForegroundCompactorBadPlanSecondaryTask
}
// Now use the builder after resetting it.
builder.Reset(0)
builder.Reset()
if err := m.foregroundCompactWithTask(
builder, task,
log, logger.With(zap.Int("task", i)),
Expand Down
11 changes: 3 additions & 8 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ type builder struct {
opts Options
newUUIDFn util.NewUUIDFn

offset postings.ID

batchSizeOne index.Batch
docs []doc.Document
idSet *IDsMap
Expand Down Expand Up @@ -286,12 +284,10 @@ func (b *builder) IndexConcurrency() int {
return b.concurrency
}

func (b *builder) Reset(offset postings.ID) {
func (b *builder) Reset() {
b.status.Lock()
defer b.status.Unlock()

b.offset = offset

// Reset the documents slice.
var empty doc.Document
for i := range b.docs {
Expand Down Expand Up @@ -485,16 +481,15 @@ func (b *builder) AllDocs() (index.IDDocIterator, error) {
b.status.RLock()
defer b.status.RUnlock()

rangeIter := postings.NewRangeIterator(b.offset,
b.offset+postings.ID(len(b.docs)))
rangeIter := postings.NewRangeIterator(0, postings.ID(len(b.docs)))
return index.NewIDDocIterator(b, rangeIter), nil
}

func (b *builder) Doc(id postings.ID) (doc.Document, error) {
b.status.RLock()
defer b.status.RUnlock()

idx := int(id - b.offset)
idx := int(id)
if idx < 0 || idx >= len(b.docs) {
return doc.Document{}, errDocNotFound
}
Expand Down
4 changes: 2 additions & 2 deletions src/m3ninx/index/segment/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestBuilderFields(t *testing.T) {
}()

for i := 0; i < 10; i++ {
builder.Reset(0)
builder.Reset()

knownsFields := map[string]struct{}{}
for _, d := range testDocuments {
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestBuilderTerms(t *testing.T) {
}()

for i := 0; i < 10; i++ {
builder.Reset(0)
builder.Reset()

knownsFields := map[string]map[string]struct{}{}
for _, d := range testDocuments {
Expand Down
10 changes: 3 additions & 7 deletions src/m3ninx/index/segment/builder/multi_segments_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type builderFromSegments struct {
idSet *IDsMap
segments []segmentMetadata
termsIter *termsIterFromSegments
offset postings.ID
segmentsOffset postings.ID
}

Expand All @@ -60,9 +59,7 @@ func NewBuilderFromSegments(opts Options) segment.SegmentsBuilder {
}
}

func (b *builderFromSegments) Reset(offset postings.ID) {
b.offset = offset

func (b *builderFromSegments) Reset() {
// Reset the documents slice
var emptyDoc doc.Document
for i := range b.docs {
Expand Down Expand Up @@ -152,13 +149,12 @@ func (b *builderFromSegments) Docs() []doc.Document {
}

func (b *builderFromSegments) AllDocs() (index.IDDocIterator, error) {
rangeIter := postings.NewRangeIterator(b.offset,
b.offset+postings.ID(len(b.docs)))
rangeIter := postings.NewRangeIterator(0, postings.ID(len(b.docs)))
return index.NewIDDocIterator(b, rangeIter), nil
}

func (b *builderFromSegments) Doc(id postings.ID) (doc.Document, error) {
idx := int(id - b.offset)
idx := int(id)
if idx < 0 || idx >= len(b.docs) {
return doc.Document{}, errDocNotFound
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newTestSegmentWithDocs(
t *testing.T,
docs []doc.Document,
) segment.Segment {
seg, err := mem.NewSegment(0, testMemOptions)
seg, err := mem.NewSegment(testMemOptions)
require.NoError(t, err)

defer func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestFieldPostingsListIterFromSegments(t *testing.T) {
}),
}
builder := NewBuilderFromSegments(testOptions)
builder.Reset(0)
builder.Reset()

b, ok := builder.(*builderFromSegments)
require.True(t, ok)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestTermsIterFromSegmentsDeduplicates(t *testing.T) {
}

builder := NewBuilderFromSegments(testOptions)
builder.Reset(0)
builder.Reset()
require.NoError(t, builder.AddSegments(segments))
iter, err := builder.Terms([]byte("fruit"))
require.NoError(t, err)
Expand Down
22 changes: 8 additions & 14 deletions src/m3ninx/index/segment/fst/encoding/docs/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,18 @@ var (
errDocNotFound = errors.New("doc not found")
)

var _ Reader = (*SliceReader)(nil)
var _ index.DocRetriever = (*SliceReader)(nil)

// SliceReader is a docs slice reader for use with documents
// stored in memory.
type SliceReader struct {
offset postings.ID
docs []doc.Document
docs []doc.Document
}

// NewSliceReader returns a new docs slice reader.
func NewSliceReader(offset postings.ID, docs []doc.Document) *SliceReader {
return &SliceReader{offset: offset, docs: docs}
}

// Base returns the postings ID base offset of the slice reader.
func (r *SliceReader) Base() postings.ID {
return r.offset
func NewSliceReader(docs []doc.Document) *SliceReader {
return &SliceReader{docs: docs}
}

// Len returns the number of documents in the slice reader.
Expand All @@ -58,7 +53,7 @@ func (r *SliceReader) Len() int {

// Read returns a document from the docs slice reader.
func (r *SliceReader) Read(id postings.ID) (doc.Document, error) {
idx := int(id - r.offset)
idx := int(id)
if idx < 0 || idx >= len(r.docs) {
return doc.Document{}, errDocNotFound
}
Expand All @@ -71,9 +66,8 @@ func (r *SliceReader) Doc(id postings.ID) (doc.Document, error) {
return r.Read(id)
}

// AllDocs returns a docs iterator.
func (r *SliceReader) AllDocs() index.IDDocIterator {
postingsIter := postings.NewRangeIterator(r.offset,
r.offset+postings.ID(r.Len()))
// Iter returns a docs iterator.
func (r *SliceReader) Iter() index.IDDocIterator {
postingsIter := postings.NewRangeIterator(0, postings.ID(r.Len()))
return index.NewIDDocIterator(r, postingsIter)
}
Loading

0 comments on commit 225a0c4

Please sign in to comment.