-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for reading from snapshot files #757
Conversation
6098798
to
44c2e31
Compare
Codecov Report
@@ Coverage Diff @@
## master #757 +/- ##
==========================================
- Coverage 78.33% 78.21% -0.13%
==========================================
Files 355 355
Lines 30188 30570 +382
==========================================
+ Hits 23649 23909 +260
- Misses 4995 5079 +84
- Partials 1544 1582 +38
Continue to review full report at Codecov.
|
41daba3
to
c6294fa
Compare
6b4a87b
to
56af513
Compare
stream := encoder.Stream() | ||
if stream == nil { | ||
return fmt.Errorf("nil stream") | ||
// None of the datapoints passed the predicate. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we just use some int here to count how many datapoints we encoded or something? I'd like to one day get rid of the silly nil
return type that encoder.Stream()
returns if no data is encoded.
We could just also change the return type to be stream, ok := encoder.Stream()
and return a bool
as a second argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we open an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/dbnode/persist/fs/files.go
Outdated
// This is the default size that the standard library uses. We've | ||
// redefined it here to make some of public APIs nicer and not | ||
// require the user to specify this value. | ||
defaultBufioReaderSize = 4096 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably make this a var and infer it yeah? i.e. var defaultBufioReaderSize = bufio.NewReader(nil).Size()
, I tested this and seems to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh nice thats cool
s.log. | ||
WithFields( | ||
xlog.NewField("namespace", mostRecentSnapshot.ID.Namespace), | ||
xlog.NewField("blockStart", mostRecentSnapshot.ID.BlockStart), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string? BlockStart.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// commit logs to read. | ||
s.log.Infof( | ||
"no snapshots for shard: %d and blockStart: %d", | ||
shard, blockStart.Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string like we do elsewhere? blockStart.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if !isMultipleOfBlockSize { | ||
return nil, fmt.Errorf( | ||
"received bootstrap range that is not multiple of blockSize, blockSize: %d, start: %d, end: %d", | ||
blockSize, currRange.End.Unix(), currRange.Start.Unix(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log dates as string like we do elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
s.log.Infof( | ||
"reading snapshot for shard: %d and blockStart: %d and volume: %d", | ||
shard, blockStart.Unix(), mostRecentCompleteSnapshot.ID.VolumeIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string like we do elsewhere? blockStart.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Should never happen. | ||
return nil, nil, fmt.Errorf( | ||
"%s shard: %d and block: %d had zero value for most recent snapshot time", | ||
instrument.InvariantViolatedMetricName, shard, block.ToTime().Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string like we do elsewhere? block.ToTime().String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
s.log.Infof( | ||
"most recent snapshot for block: %d and shard: %d is %d", | ||
block.ToTime().Unix(), shard, mostRecent.CachedSnapshotTime.Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log these dates as string like we do elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
for block, minSnapshotTime := range minimumMostRecentSnapshotTimeByBlock { | ||
s.log.Infof( | ||
"min snapshot time for block: %d is: %d", | ||
block.ToTime().Unix(), minSnapshotTime.Unix()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log these dates as string like we do elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
s.log. | ||
Infof( | ||
"opting to read commit log: %s with start: %d and duration: %s", | ||
fileName, fileStart.Unix(), fileBlockSize.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string like we do elsewhere? fileStart.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
s.log. | ||
Infof( | ||
"opting to skip commit log: %s with start: %d and duration: %s", | ||
fileName, fileStart.Unix(), fileBlockSize.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps best to log this as string like we do elsewhere? fileStart.String()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if !snapshotData.IsEmpty() { | ||
// No snapshot or commit log data, skip. | ||
// TODO: When we start allowing peer bootstrapping after commit log bootstrapping, | ||
// then we will need to change this to mark the shard time ranges as unfulfilled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm any reason not to do this now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was an unclear comment, its more like just in general we'll need to be smarter about whether or not the existence or absence of snapshot files constitutes success (with no data) or unfulfilled. I think we discussed this briefly and we were talking about using the shard initialization state. That change alone will probably warrant a one page design doc
if unmergedShard.series == nil { | ||
continue | ||
if !snapshotData.IsEmpty() { | ||
// No snapshot or commit log data, skip. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be if snapshotData.IsEmpty() { ... }
then? If this really a bug maybe we should add a test case too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed. Was not technically a bug (did not trigger data loss) but was quite misleading. We now just do a continue at the top if unmerged.series == nil because it means we're not bootstrapping that one
// instead of re-allocating them. This requires that the ident.Tags that is | ||
// returned will have the same (or shorter) life time as the seriesID, | ||
// otherwise the operation is unsafe. | ||
func TagsFromTagsIter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nw, good stuff.
src/dbnode/storage/shard.go
Outdated
} | ||
err := tags.Err() | ||
clonedTags, err = convert.TagsFromTagsIter( | ||
clonedID, tags.Duplicate(), s.identifierPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, you're now closing the tags passed in rather than the tags duplicated. This might be an issue, especially for OnRetrieveBlock(...)
code path that closes the original tags passed to this method itself (i.e. would be a double close).
I think you should keep the same tags = tags.Duplicate()
as before, before passing to the TagsFromTagsIter(...)
call, then be sure to close that duplicate but not the original.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you're at this for the other code path, can you add tagsIter.Close()
after the call to s.insertSeriesSync(...)
inside (s *dbShard) Bootstrap(bootstrappedSeries *result.Map) error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed and handled in person
src/dbnode/storage/shard.go
Outdated
tags.Close() | ||
|
||
if tagsIter.Remaining() > 0 { | ||
dupTagsIter := tagsIter.Duplicate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to duplicate before doing the Remaining()
check because it may be at the end of its iterating and have Remaining() == 0
in that case (before duplication)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Add supports from reading from snapshot files during commit log bootstrap, including unit, integration, and prop tests.