-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Use Documents Builder And Persist Index Data In Peer BS #2078
Conversation
…lder and persist after finishing.
// allocator for a bootstrap result index block given index options. | ||
func NewBootstrapResultMutableSegmentAllocator( | ||
func NewBootstrapResultDocumentsBuilderAllocator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we rename this NewBootstrapDocumentsBuilderAllocator(...)
to match the new type?
@@ -642,73 +648,123 @@ func (s *peersSource) readIndex( | |||
zap.Int("concurrency", concurrency), | |||
) | |||
|
|||
groupFn := bootstrapper.NewShardTimeRangesTimeWindowGroups | |||
groupedByBlockSize := groupFn(shardsTimeRanges, dataBlockSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, should this be index block size instead of data block size? Just comparing to the fs/source.go
implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard to know, I see we did the data block size for fetching below.
if !inserted { | ||
// If the metadata wasn't inserted we finalize the metadata. | ||
dataBlock.Finalize() | ||
metadata, err := session.FetchBootstrapBlocksMetadataFromPeers(ns.ID(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I think for the peers indexing read side we can just index from whatever we just persisted to disk.
if exists { | ||
return false, nil | ||
} | ||
|
||
d, err := convert.FromMetric(dataBlock.ID, dataBlock.Tags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Look into potentially pooling structs here.
Codecov Report
@@ Coverage Diff @@
## master #2078 +/- ##
========================================
Coverage ? 63.4%
========================================
Files ? 853
Lines ? 78234
Branches ? 0
========================================
Hits ? 49612
Misses ? 24942
Partials ? 3680
Continue to review full report at Codecov.
|
) | ||
defer func() { | ||
docsPool.Put(batch) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Realize this can be abbreviated to defer docsPool.Put(batch)
(think this code was elsewhere, but just FYI).
wg sync.WaitGroup | ||
count = len(shardsTimeRanges) | ||
concurrency = s.opts.DefaultShardConcurrency() | ||
indexBlockSize = ns.Options().RetentionOptions().BlockSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be dataBlockSize
? I think index block size would come from ns.Options().IndexOptions().BlockSize()
?
currRange := xtime.Range{ | ||
Start: blockStart, | ||
End: blockStart.Add(size), | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we move this method into a sub-method? It's just a little harder to read when it's all in a workers.Go(func() { /* lots of content here */ })
Perhaps we could also deal with the error case inline in the inline func.
Something like:
workers.Go(func() {
defer wg.Done()
remainingRanges, timesWithErrors = s.processReaders(...)
s.markRunResultErrorsAndUnfulfilled(resultLock, r, requestedRanges,
remainingRanges, timesWithErrors)
})
} | ||
|
||
// CreateFlushBatchFn creates a batch flushing fn for code reuse. | ||
func CreateFlushBatchFn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we perhaps add this as a method to an IndexBuilder
type that wraps the segment.DocumentsBuilder
?
i.e.
type IndexBuilders struct {
sync.Mutex
builders map[xtime.UnixNano]*IndexBuilder
}
func (b *IndexBuilders) GetOrAddBuilder(st time.Time) *IndexBuilder {
b.Lock()
defer b.Unlock()
// return a *IndexBuilder for the time
}
type IndexBuilder struct {
sync.Mutex
builder segment.DocumentsBuilder
}
func (b *IndexBuilder) FlushBatch(batch []doc.Document) ([]doc.Document, error) {
if len(batch) == 0 {
// Last flush might not have any docs enqueued
return batch, nil
}
b.Lock()
err := b.builder.InsertBatch(index.Batch{
Docs: batch,
AllowPartialUpdates: true,
})
b.Unlock()
// deal with error and resetting batch
return batch, nil
}
// Useful for tests. | ||
return | ||
} | ||
p.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhat related commentary: Hopefully with Go 1.14 we can just use defer
everywhere to unlock locks with pretty much zero cost: https://twitter.com/bradfitz/status/1184547163235184640?lang=en
builder, ok := indexBuilders[xtime.ToUnixNano(blockStart)] | ||
if !ok { | ||
// No-op if there are is no index builder for this time block (nothing to persist). | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm wonder if we should error out in this case? I would expect if we call BuildBootstrapIndexSegment(...)
we really do want to actually build the segment?
Maybe we should avoid this being a precondition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we still need to return an error here I think?
builder, ok := indexBuilders[xtime.ToUnixNano(blockStart)] | ||
if !ok { | ||
// No-op if there are no index builders for this time block (nothing to persist). | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, wonder if we should error out in this case? I would expect if we call PersistBootstrapIndexSegment(...) we really do want to actually build the segment?
Maybe we should avoid this being a precondition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we still need to return an error here I think?
…er or not segment came from disk.
idxOpts, | ||
) | ||
s.markRunResultErrorsAndUnfulfilled(resultLock, r, timeWindowReaders.Ranges, | ||
remainingRanges, timesWithErrors) | ||
// NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, | ||
// it is not thread safe and requires reset after every processed index block. | ||
s.builder.Builder().Reset(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, would call Reset just before processReaders(...)
@@ -183,6 +207,7 @@ func (s *peersSource) readData( | |||
shardRetrieverMgr block.DatabaseShardBlockRetrieverManager | |||
persistFlush persist.FlushPreparer | |||
shouldPersist = false | |||
// TODO(bodu): We should migrate to series.CacheLRU only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What this PR does / why we need it:
Remove usage of mem segment in fs & peer bootstrapping.
Use documents builder and batch writes.
Multiplex building index blocks across threads in peer bootstrapping and persist index data after blocks are built.
Special notes for your reviewer:
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: