-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove CacheAllMetadata caching policy #1110
Changes from 3 commits
69546a4
c252c9b
a289cd7
449854f
8b26fe7
c8663c3
3336894
37f4cb8
444880e
0a41825
a86d18f
296be93
2b5ac80
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -263,13 +263,6 @@ func (b *dbBlock) HasMergeTarget() bool { | |
return hasMergeTarget | ||
} | ||
|
||
func (b *dbBlock) IsRetrieved() bool { | ||
b.RLock() | ||
retrieved := b.retriever == nil | ||
b.RUnlock() | ||
return retrieved | ||
} | ||
|
||
func (b *dbBlock) WasRetrievedFromDisk() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another random thing, I think we might be able to delete the
Without the CacheAllMetadata policy, these two steps will never be separate so you can probably combine it all into one function call. For example in the series we do this: b.ResetRetrievable(startTime, blockSize, s.blockRetriever, metadata)
// Use s.id instead of id here, because id is finalized by the context whereas
// we rely on the G.C to reclaim s.id. This is important because the block will
// hold onto the id ref, and (if the LRU caching policy is enabled) the shard
// will need it later when the WiredList calls its OnEvictedFromWiredList method.
// Also note that ResetRetrievable will mark the block as not retrieved from disk,
// but OnRetrieveBlock will then properly mark it as retrieved from disk so subsequent
// calls to WasRetrievedFromDisk will return true.
b.OnRetrieveBlock(s.id, tags, startTime, segment) It seems like that could just become b.Reset(startTime, blockSize, segment) And then we could delete the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
b.RLock() | ||
wasRetrieved := b.wasRetrievedFromDisk | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,10 +168,6 @@ type DatabaseBlock interface { | |
// merged during Stream(). | ||
HasMergeTarget() bool | ||
|
||
// IsRetrieved returns whether the block is already retrieved. Only | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, shouldn't this be required for some cache policies (e.g. LRU/RecentlyRead)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I trusted the comment! I believe the other caching policies use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think Also, the only place where i see func (b *dbBufferBucket) merge() (mergeResult, error) {
if !b.needsMerge() {
// Save unnecessary work
return mergeResult{}, nil
}
merges := 0
bopts := b.opts.DatabaseBlockOptions()
encoder := bopts.EncoderPool().Get()
encoder.Reset(b.start, bopts.DatabaseBlockAllocSize())
// If we have to merge bootstrapped from disk during a merge then this
// can make ticking very slow, ensure to notify this bug
if len(b.bootstrapped) > 0 {
unretrieved := 0
for i := range b.bootstrapped {
if !b.bootstrapped[i].IsRetrieved() {
unretrieved++
}
}
if unretrieved > 0 {
log := b.opts.InstrumentOptions().Logger()
log.Warnf("buffer merging %d unretrieved blocks", unretrieved)
}
} This is a very weird code path that I don't think we would ever follow at this point. If we want to be extra cautious could replace that IsRetrieved() call with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah let's remove it, don't think this is plausible now. |
||
// meaningful in the context of the CacheAllMetadata series caching policy. | ||
IsRetrieved() bool | ||
|
||
// WasRetrievedFromDisk returns whether the block was retrieved from storage. | ||
WasRetrievedFromDisk() bool | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -343,11 +343,8 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( | |
// the series will either be held in memory, or removed from memory once | ||
// flushing has completed. | ||
// Once everything has been flushed to disk then depending on the series | ||
// caching policy the function is either done, or in the case of the | ||
// CacheAllMetadata policy we loop through every series and make every block | ||
// retrievable (so that we can retrieve data for the blocks that we're caching | ||
// the metadata for). | ||
// In addition, if the caching policy is not CacheAll or CacheAllMetadata, then | ||
// caching policy the function is either done. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sentence terminates too early now |
||
// In addition, if the caching policy is not CacheAll, then | ||
// at the end we remove all the series objects from the shard result as well | ||
// (since all their corresponding blocks have been removed anyways) to prevent | ||
// a huge memory spike caused by adding lots of unused series to the Shard | ||
|
@@ -364,14 +361,10 @@ func (s *peersSource) flush( | |
var ( | ||
ropts = nsMetadata.Options().RetentionOptions() | ||
blockSize = ropts.BlockSize() | ||
shardRetriever = shardRetrieverMgr.ShardRetriever(shard) | ||
tmpCtx = context.NewContext() | ||
seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() | ||
persistConfig = opts.PersistConfig() | ||
) | ||
if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil { | ||
return fmt.Errorf("shard retriever missing for shard: %d", shard) | ||
} | ||
|
||
for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { | ||
prepareOpts := persist.DataPrepareOptions{ | ||
|
@@ -440,18 +433,6 @@ func (s *peersSource) flush( | |
case series.CacheAll: | ||
// Leave the blocks in the shard result, we need to return all blocks | ||
// so we can cache in memory | ||
case series.CacheAllMetadata: | ||
// NB(r): We can now make the flushed blocks retrievable, note that we | ||
// explicitly perform another loop here and lookup the block again | ||
// to avoid a large expensive allocation to hold onto the blocks | ||
// that we just flushed that would have to be pooled. | ||
// We are explicitly trading CPU time here for lower GC pressure. | ||
metadata := block.RetrievableBlockMetadata{ | ||
ID: s.ID, | ||
Length: bl.Len(), | ||
Checksum: checksum, | ||
} | ||
bl.ResetRetrievable(start, blockSize, shardRetriever, metadata) | ||
default: | ||
// Not caching the series or metadata in memory so finalize the block, | ||
// better to do this as we loop through to make blocks return to the | ||
|
@@ -486,15 +467,13 @@ func (s *peersSource) flush( | |
} | ||
} | ||
|
||
// We only want to retain the series metadata in one of three cases: | ||
// We only want to retain the series metadata in one of two cases: | ||
// 1) CacheAll caching policy (because we're expected to cache everything in memory) | ||
// 2) CacheAllMetadata caching policy (because we're expected to cache all metadata in memory) | ||
// 3) PersistConfig.FileSetType is set to FileSetSnapshotType because that means we're bootstrapping | ||
// 2) PersistConfig.FileSetType is set to FileSetSnapshotType because that means we're bootstrapping | ||
// an active block that we'll want to perform a flush on later, and we're only flushing here for | ||
// the sake of allowing the commit log bootstrapper to be able to recover this data if the node | ||
// goes down in-between this bootstrapper completing and the subsequent flush. | ||
shouldRetainSeriesMetadata := seriesCachePolicy == series.CacheAll || | ||
seriesCachePolicy == series.CacheAllMetadata || | ||
persistConfig.FileSetType == persist.FileSetSnapshotType | ||
|
||
if !shouldRetainSeriesMetadata { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,12 +202,6 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) { | |
continue | ||
} | ||
|
||
if cachePolicy == CacheAllMetadata && !currBlock.IsRetrieved() { | ||
// Already unwired | ||
result.UnwiredBlocks++ | ||
continue | ||
} | ||
|
||
// Potentially unwire | ||
var unwired, shouldUnwire bool | ||
// IsBlockRetrievable makes sure that the block has been flushed. This | ||
|
@@ -217,9 +211,6 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) { | |
switch cachePolicy { | ||
case CacheNone: | ||
shouldUnwire = true | ||
case CacheAllMetadata: | ||
// Apply RecentlyRead logic (CacheAllMetadata is being removed soon) | ||
fallthrough | ||
case CacheRecentlyRead: | ||
sinceLastRead := now.Sub(currBlock.LastReadTime()) | ||
shouldUnwire = sinceLastRead >= wiredTimeout | ||
|
@@ -235,29 +226,9 @@ func (s *dbSeries) updateBlocksWithLock() (updateBlocksResult, error) { | |
} | ||
|
||
if shouldUnwire { | ||
switch cachePolicy { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So much cleanup :D |
||
case CacheAllMetadata: | ||
// Keep the metadata but remove contents | ||
|
||
// NB(r): Each block needs shared ref to the series ID | ||
// or else each block needs to have a copy of the ID | ||
id := s.id | ||
checksum, err := currBlock.Checksum() | ||
if err != nil { | ||
return result, err | ||
} | ||
metadata := block.RetrievableBlockMetadata{ | ||
ID: id, | ||
Length: currBlock.Len(), | ||
Checksum: checksum, | ||
} | ||
currBlock.ResetRetrievable(start, currBlock.BlockSize(), retriever, metadata) | ||
default: | ||
// Remove the block and it will be looked up later | ||
s.blocks.RemoveBlockAt(start) | ||
currBlock.Close() | ||
} | ||
|
||
// Remove the block and it will be looked up later | ||
s.blocks.RemoveBlockAt(start) | ||
currBlock.Close() | ||
unwired = true | ||
result.madeUnwiredBlocks++ | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank god, I never understood this. Does this mean you can remove the retriever field from the block as well as any logic dependent on that being present?
The following functions still seem to be using the retriever field:
We might even get a small perf boost cause this is will be million of less pointers for the G.C to chase