-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dbnode] Add forward index write capability #1613
Conversation
…s correctly written to the future block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you wire the configuration for this feature up to the YAML and add some documentation about it (comment over the YAML or markdown or w/e)
Also, are there any implications on correctness here? I.E I think this feature may impact the results of aggregation queries. Thats probably fine but worth documenting
|
||
// forwardIndexDice is a die roll that adds a chance for incoming index writes | ||
// arriving near a block boundary to be duplicated and written to the next block | ||
// index, adding jitter and smoothing index load. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"and smoothing index load so that block boundaries do not cause a huge influx of new documented that need to be indexed all at once"
fmt.Errorf("invalid forward write threshold %f", threshold) | ||
} | ||
|
||
bufferFragment := float64(bufferFuture) * threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why there is a relationship between bufferFuture and the threshold...I guess in my mind bufferFuture would set a maximum on the threshold so if bufferFuture is 10 minutes and you have a block size of 2 hours then the earliest you could start forward indexing would be at 1:50PM, but I don't understand the logic here where you multiply bufferFuture by some threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so threshold is defined as a fragment of the bufferFuture to allow some tweaking on when ; my thinking is you'd put the threshold
at 0.9 or something, and then in your example, we'd start forward indexing at 1:51 PM
, just to cut down on any possibility of trying to write the values too early in case of positive skew on the emitter.
i.e. bufferFragment ∈ (0, bufferFuture]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mind adding an explanation for this in a comment
@@ -0,0 +1,61 @@ | |||
// Copyright (c) 2019 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I coulda sworn we already had a dice package that we use in a few different places. Is this just coming from m3x?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from internal statsdex; I think there's a sampler package we have already, but it performs slightly differently (i.e. NewSampler(0.1).Sample()
returns true IFF it's been called a multiple of 10 times)
src/x/dice/dice.go
Outdated
} | ||
|
||
func (d *epoch) Roll() bool { | ||
return (time.Now().UnixNano()+d.jitter)%d.r == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity, why time over a rand generator? Thread safety?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/x/dice/dice.go
Outdated
Roll() bool | ||
} | ||
|
||
// NewEpochDice constructs a new Dice based on UNIX epoch time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify this comment to explain that it uses the current time as a source of randomness
src/dbnode/storage/index.go
Outdated
@@ -524,8 +539,15 @@ func (i *nsIndex) writeBatches( | |||
batch.MarkUnmarkedEntryError(m3dberrors.ErrTooPast, idx) | |||
return | |||
} | |||
|
|||
if i.forwardIndexDice.forwardIndexing(entry.Timestamp) { | |||
forwardWriteEntry := entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a safe clone? There are three fields that are "pointer" type:
- OnIndexSeries
- result
- resultVal
Regardless I wonder if its worth adding a clone() method so that even if this operation is safe now its more likely to stay safe in the future
src/dbnode/storage/index.go
Outdated
|
||
if i.forwardIndexDice.forwardIndexing(entry.Timestamp) { | ||
forwardWriteEntry := entry | ||
forwardWriteEntry.Timestamp = entry.Timestamp.Add(blockSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the timestamp used for anything other than figuring out which block to put it in? Also, I may be wrong but my understanding of the forward indexing logic seemed to imply that you could forward index stuff that wasn't yet within the bufferFuture window. Is that correct? If so, is it guaranteed there is a block available for this write to get indexed into?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the current behaviour, it will only forward index within a percentage of the bufferFuture
window
src/dbnode/storage/index.go
Outdated
}) | ||
|
||
batch.AppendAll(forwardWriteBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately this isn't safe as is. When trying to index a Series, we track the last successful block starts when it was indexed (in *lookup.Entry via the shard map); and further, maintain a reference count to Series entry. Both of these are updated via the OnIndexSeries
per element in the WriteBatchEntry.
By just appending entries the way it's being done right now, it's going to definitely mess up the reference count (because you'll dec for each copy, even though we only have a inc on the original).
Won't necessarily mess up the last successful write times but probably need to consider some nuances there too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: reference counting - we can either inc the appropriate references when making copies (safest), or ensure we only dec the final copy amongst all the ones we make here (can't be the original, has to be last one we track because of cleanup semantics in the Shard).
Re: tracking last successful write time update. Here's a good explanation.
AFAICT, the changes you're introducing can make it so that we're writing to 4 block starts (instead of the 3 we currently assume worst case). I'm not a 100% sure that's accurate tho. Should work that out, and if so it makes sense to bump the size of the static alloc from 3 to 4.
Codecov Report
@@ Coverage Diff @@
## master #1613 +/- ##
========================================
+ Coverage 71.9% 72% +<.1%
========================================
Files 954 956 +2
Lines 79170 79236 +66
========================================
+ Hits 56989 57057 +68
+ Misses 18460 18454 -6
- Partials 3721 3725 +4
Continue to review full report at Codecov.
|
src/x/dice/dice.go
Outdated
} | ||
|
||
func (d *epoch) Roll() bool { | ||
return rand.Int63()%d.r == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using the global rand, it's a locked source: https://golang.org/src/math/rand/rand.go?s=9470:9488#L288. Better to control whatever concurrency you need by yourself.
Could you also use the github.com/MichaelTJones/pcg RNG. It's a lot quicker/cheaper than the standard one.
} | ||
|
||
// forwardIndexing decides if a timestamp is eligible for forward index writes. | ||
func (o *forwardIndexDice) forwardIndexing(timestamp time.Time) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to something like shouldIndex
or roll
to indicate it's a test. the current method name doesn't convey what you're testing for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, changed to roll
to preserve dice context
src/dbnode/storage/index.go
Outdated
@@ -524,8 +539,23 @@ func (i *nsIndex) writeBatches( | |||
batch.MarkUnmarkedEntryError(m3dberrors.ErrTooPast, idx) | |||
return | |||
} | |||
|
|||
if i.forwardIndexDice.forwardIndexing(entry.Timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once you fix the global rand usage, will need to be careful about using this dice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we already have a lock that we can use to protect the dice, than it's gucci. if not, will have to think about the contention it'll introduce to add a lock here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see if there's a way to avoid having to get a lock here; there's a lock above batch processing I could roll
in, but that would mean that the forward write would be applied to the whole batch. If that's fine, should be easy 👍
|
||
} | ||
|
||
func (m mockOnIndexSeries) NeedsIndexUpdate(indexBlockStartForWrite xtime.UnixNano) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you just return false always? doesn't seem like you're updating the value ever
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah fair, had it like this in case we wanted to be able to return different values in the future
reQuery, err := m3ninxidx.NewRegexpQuery([]byte("name"), []byte("val.*")) | ||
assert.NoError(t, err) | ||
|
||
// NB: query both the current and the next index block to ensure that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is confirming that the write exists in the next block -- cause you're querying both current and next block. Need to ensure you're checking both individually, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fine, now
is 5 minutes before the block boundary, and this will check the range which is from 6 to 4 minutes before the boundary to check for the current block, and then 6 to 4 minutes before the next block boundary to check only the future block
reQuery, err := m3ninxidx.NewRegexpQuery([]byte("name"), []byte("val.*")) | ||
assert.NoError(t, err) | ||
|
||
// NB: query both the current and the next index block to ensure that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be same here
lifecycle = index.NewMockOnIndexSeries(ctrl) | ||
) | ||
|
||
lifecycle.EXPECT().OnIndexFinalize(xtime.ToUnixNano(ts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: mind putting these in gomock.InOrder(...)
as much as possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM save the in order nit
batch.MarkUnmarkedEntryError(m3dberrors.ErrTooPast, idx) | ||
return | ||
} | ||
|
||
if forwardIndexDice.roll(ts) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this also be if forwardIndexEnabled && ..
maybe add a test that would have caught this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The !enabled
case should be caught here: https://github.com/m3db/m3/pull/1613/files/70b0912726891694461e47b7d21499aeddb8bbca#diff-b5380446b59ebd51b8b8a482df15c4d7R93
What this PR does / why we need it:
This PR eases index pressure at block boundaries by allowing incoming writes near the boundary to have a chance to be dual written to the upcoming index block based on config defined probabilities.
Special notes for your reviewer:
Ensure that there's no further changes to timestamp required.
Does this PR introduce a user-facing and/or backwards incompatible change?:
Does this PR require updating code package or user-facing documentation?: