-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cold flushes #1624
Cold flushes #1624
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1624 +/- ##
========================================
+ Coverage 71.3% 72% +0.6%
========================================
Files 962 964 +2
Lines 80908 80318 -590
========================================
+ Hits 57747 57839 +92
+ Misses 19393 18703 -690
- Partials 3768 3776 +8
Continue to review full report at Codecov.
|
2ef92f3
to
39fafc1
Compare
74c32fd
to
aa3d985
Compare
NamespaceMetadata: nsMd, | ||
Shard: shard, | ||
BlockStart: startTime, | ||
DeleteIfExists: false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment here please. Also, how does this work right now? I thought the flushPreparer thing would complain if the file already existed and you set this to false
.
Is it because you're mocking everything right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, everything is mocked out right now. After the seek manager the numbered filesets change lands, DeleteIfExists should be false
, so probably keep this false
for now?
curSchema, err := schemaReg.GetLatestSchema(metadata.ID()) | ||
if curSchema != nil { | ||
curSchemaId = curSchema.DeployId() | ||
curSchemaID = curSchema.DeployId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not from your changes but looks like the error on line 236 is unchecked. what's the deal there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, not immediately obvious to me what to do on error, opened #1694.
src/dbnode/persist/fs/types.go
Outdated
) error | ||
} | ||
|
||
// NewMergerFn is the function to call to get a new Merger. Mostly used as a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm if it's only used for testing, does it need to be exported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That comment is confusing - I removed it. What I meant is that I needed the ability to pass in the merger constructor in the cold flush code in shard.go
(a different package) so that it is testable.
// data, nor does it clean up the original fileset. | ||
func NewMerger( | ||
reader DataFileSetReader, | ||
blockAllocSize int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of all the args, could you make an Options type
@@ -63,6 +64,7 @@ type flushManager struct { | |||
// are used for emitting granular gauges. | |||
state flushManagerState | |||
isFlushing tally.Gauge | |||
isColdFlushing tally.Gauge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super nit, I wonder if we should call these a "compaction" flush instead of cold flush? I'm not really too fussed, but interested to hear thoughts cc @justinjc @prateek @richardartoul
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Like still keep the hot/cold terminology but just call these flushes "compaction flushes" because then if repairs uses this code path, it's not just cold writes that are being persisted per-se).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Second thoughts, seems to hard to rename at this point, let's not bother.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually renamed this from compact to cold flush midway through 😅. Maybe we'll re-tackle this when the usage of cold flush differs too much later.
multiErr = multiErr.Add(err) | ||
} | ||
|
||
return multiErr.FinalError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that if there is an error here that the commit logs won't get deleted? I think the commit log cleanup logic only checks that there is a valid snapshot, not that also cold flushes contained by those commit logs were written to disk no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right, that's true. We can either:
- Return if there's an error cold flushing
- Don't clean up commit logs unless a successful cold flush has happened too
Both run the risk of using up too much disk space if cold flushes fail, but (1) is easier to implement, so I'll do that.
|
||
nextVersion := m.retriever.RetrievableBlockColdVersion(startTime) + 1 | ||
|
||
tmpCtx := context.NewContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should get one from a context pool here or else reuse one (since you are calling BlockingClose
which makes them reuseable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I'll have Read
take a context and share one across iterations.
multiIter.ResetSliceOfSlices(xio.NewReaderSliceOfSlicesFromBlockReadersIterator(brs), nsCtx.Schema) | ||
|
||
// tagsIter is never nil. | ||
tags, err := convert.TagsFromTagsIter(id, tagsIter, identPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@justinjc Yes I think with the restructuring you've done this is now safe since the tags will be valid as long as the IDs are valid, and the IDs are valid for the duration of the file writing. Can you add a comment explaining this now?
89d446e
to
8284612
Compare
|
||
// Merge merges data from a fileset with a merge target and persists it. | ||
// The caller is responsible for finalizing all resources used for the | ||
// MergeWith passed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document somewhere (here or on ForEachRemaining
) that while the data passed to ForEachRemaining
is basically a copy that will be finalized when the context is closed, the ID and tags are expected to live for as long as the user of the MergeWith requires them so they should either be NoFinalize()
or passed as copies (in your case since you're taking them from the shard/series they're NoFinalize() and its safe but would like to document)
src/dbnode/storage/series/buffer.go
Outdated
if numStreams == 1 { | ||
stream = streams[0] | ||
mergedStream = streams[0] | ||
} else { | ||
// We may need to merge again here because the regular merge method does | ||
// not merge buckets that have different versions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also doesn't merge warm and cold buckets basically right? maybe say that too
src/dbnode/storage/shard.go
Outdated
@@ -1964,7 +1959,7 @@ func (s *dbShard) ColdFlush( | |||
// Cold flushes can only happen on blockStarts that have been | |||
// warm flushed, because warm flush logic does not currently | |||
// perform any merging logic. | |||
if !s.hasWarmFlushed(t.ToTime()) { | |||
if !s.IsBlockRetrievable(t.ToTime()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you actually keep the duplicated method hasWarmFlushed
? I know its lame to duplicate but I can easily see someone changing the meaning of IsBlockRetrievable
and not expecting the impact it'll have on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for the last few minor comments
2b5b42c
to
4f27aef
Compare
What this PR does / why we need it:
This PR introduces cold flush logic that merges out of order writes with data on disk.
Special notes for your reviewer:
Sorry, this PR is rather large. A good place to start would be the
ColdFlush
function inshard.go
, where it identifies which blockStarts require cold flushes, then uses the merger to merge data together. As a natural follow up, look intomerger.go
to review actual merge logic, andfs_merge_with_mem.go
to review the logic for going through data in memory.Please note that this PR is just one part of implementing the ColdWrites feature. Landing of this PR does not enable the ability to do cold writes without other necessary changes, e.g. in the seek manager. As such, most of the code here is feature flagged off by default. This flag is at the namespace level (in the option
ColdWritesEnabled
). The call toColdFlush
innamespace.go
checks for this option, and returns success right away if the flag is false, hence, any logic below that (shard/series/buffer/actual merging) will never get run.Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: