-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Preserve multiple translog generations #24015
Conversation
Today when a flush is performed, the translog is committed and if there are no outstanding views, only the current translog generation is preserved. Yet for the purpose of sequence numbers, we need stronger guarantees than this. This commit migrates the preservation of translog generations to keep the minimum generation that would be needed to recover after the local checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basics look good. left a bunch of comments
* @param writer the index writer to commit | ||
* @param translog the translog | ||
* @param syncId the sync flush ID ({@code null} if not committing a synced flush) | ||
* @return the local checkpoint committed with the specified index writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think this is correct anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 0763e29c6486a566344a0e8b5e0868db4932623d.
@@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti | |||
try { | |||
translog.prepareCommit(); | |||
logger.trace("starting commit for flush; commitTranslog=true"); | |||
commitIndexWriter(indexWriter, translog, null); | |||
final long committedGeneration = commitIndexWriter(indexWriter, translog, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I so wish we could come up with a better name for this one - I find it confusing every time. How about translogGenForRecovery
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this comment, this is a new local variable that was never there before. Did you maybe mean to comment on Translog#lastCommittedTranslogFileGeneration
which we have discussed the confusing nature of before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all of them suffer from the same problem. This is just the first one I run into while reviewing. I think I get why it's named like this now (the translog id that was committed into lucene's metadata) but there is much room for ambiguity - is the the maximum generation for which all ops are in lucene (i.e., committed)? I wonder every time and end up double checking the code. If people find this name better, I'm good with keeping it - it's a subjective matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One naming thing that gets to me is prepareCommit
and commit
in Translog
. I don't think commit
commits anything. It trims segments based on a commit in permanent storage. I wonder if it'd make the naming easier to stop using the word "commit" for the thing that the Translog is doing. Then any time you see the word "commit" it is about a lucene commit. If you renamed Translog#commit
into Translog#updatePersistedGeneration
or something then it'd be more clear that it is reacting to a commit in Lucene. Then maybe the other variable names just fall out of that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to focus on the naming in a small follow-up PR immediately after this PR.
@@ -1812,7 +1819,7 @@ public boolean isRecovering() { | |||
* Gets the commit data from {@link IndexWriter} as a map. | |||
*/ | |||
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) { | |||
Map<String, String> commitData = new HashMap<>(6); | |||
Map<String, String> commitData = new HashMap<>(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good eye.
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations | ||
* @throws IOException if an I/O exception occurred preparing the translog commit | ||
*/ | ||
public void commit(final long committedGeneration) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this parameter name is confusing given that we have currentCommittingGeneration
. See previous suggestion for a name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's focus on naming in a folow-up PR.
}) | ||
.mapToLong(TranslogReader::getGeneration) | ||
.min() | ||
.orElseGet(this::currentFileGeneration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. When is this or else actually triggers? I guess it can only happens if you potentially have bwc readers and and one or more empty current ones. Should we assert for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can happen because we commit when we open the translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear. This is maybe worth a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think another point is that when we are flushing the engine, we ask the translog for the minimum generation for the local checkpoint plus one which might not yet be in the translog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's another very good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 435fe25.
.stream() | ||
.filter(r -> { | ||
final Checkpoint checkpoint = r.getCheckpoint(); | ||
return checkpoint.minSeqNo <= seqNo && seqNo <= checkpoint.maxSeqNo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... why do we need to check the lower bound? isn't it simpler to check seqNo <= checkpoint.maxSeqNo
alone? I read the logic to be "min generation that has ops that are required if we recover from seqNo onwards", which maps to "has ops about this point"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that was addressed by c63543b which I think I pushed while you were reviewing.
for (long seqNo = min; seqNo <= max; seqNo++) { | ||
if (seenSeqNos.add(seqNo)) { | ||
assertThat( | ||
translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused -
say we have:
gen 1 -> [1, 2, 8]
gen 2 -> [4, 5, 6]
gen 3 -> [7, 9]
wouldn't translog.getMinGenerationForSeqNo(5)
be 1? if read this correctly it will expect it to be 2 (where 5
was indexed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was addressed while you were in the middle of reviewing by c63543b.
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations | ||
* @throws IOException if an I/O exception occurred preparing the translog commit | ||
*/ | ||
public void commit(final long committedGeneration) throws IOException { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
ensureOpen(); | ||
if (currentCommittingGeneration == NOT_SET_GENERATION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this makes sense anymore? should we just forbid it with an exception? or better yet require that the generation given to use is <= current and be done with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you saying that we should require preparing a commit (invoking Translog#prepareCommit
) before committing (invoking Translog#commit
)?
translog.add(new Translog.NoOp(seqNo++, 0, "test")); | ||
if (rarely()) { | ||
final long generation = translog.currentFileGeneration(); | ||
translog.prepareCommit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we sometime roll here to for kicks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preparing a commit executes a roll, or do you mean invoke roll generation directly and skip the prepare commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean have an extra roll between prepare commit and commit. Just for the crazy case that someone indexes a generation worth of data between the two. Better be safe than sorry, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed c4690dd.
for (int i = 0; i < operations; i++) { | ||
translog.add(new Translog.NoOp(seqNo++, 0, "test")); | ||
if (rarely()) { | ||
try (Translog.View ignored = translog.newView()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be stronger to hold a view that prevents clearing files even if committed. Right now the view doesn't much does it - we would clean up to committedGeneration
anyway. I should have written a test like that before - right now we only have the evil testConcurrentWriteViewsAndSnapshot, which we may want to beef up with some rolling and prepare commits as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you're saying here. A view will prevent readers from being trimmed back to the last committed generation. Which is the point of this test, to ensure that the trim logic is right. I pushed 1817095.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused thinking that the view only holds a reference to the last generation (which is not true). I'm still not convinced that the old version did what we want, but that's besides the point. The new version is much more readable. Thank you.
@jasontedor your last commit was concurrent to my review. I think it did address some of the feedback. will look at it again once you responded to the rest. |
0763e29
to
5ba7402
Compare
@bleskes I've responded to your feedback, except the naming which we can worry about at the end. |
try (ReleasableLock ignored = writeLock.acquire()) { | ||
ensureOpen(); | ||
assert committedGeneration <= current.generation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering - shall we make this a hard exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, can we add a check that it's >= the lowest reader's generation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 32e14d0.
@@ -3570,6 +3591,72 @@ public long generateSeqNo() { | |||
} | |||
} | |||
|
|||
public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see what can be added that is not already covered by other tests. I
Correct me if i missed it but I don't see any test that creates a situation in which we recovery from a translog that consists of multiple generations where the seq# are scattered across them. testTranslogReplay
comes the closest but I think it needs some beefing up (it always writes in order, meaning that it always recovers from the last generation).
translog.add(new Translog.NoOp(seqNo++, 0, "test")); | ||
if (rarely()) { | ||
if (randomBoolean()) { | ||
try (Translog.View ignored = translog.newView()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll be OK with only having this branch of the if. Is there anything substantial that the else clause give us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With only the if branch, the view will always be taken from generation 1 to the last committed generation which means that these generations would never be trimmed. By having the else, sometimes earlier generations get trimmed (because there is no view to hold onto them) making the if branch a little more interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 42685cc.
for (int i = 0; i < operations; i++) { | ||
translog.add(new Translog.NoOp(seqNo++, 0, "test")); | ||
if (rarely()) { | ||
try (Translog.View ignored = translog.newView()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused thinking that the view only holds a reference to the last generation (which is not true). I'm still not convinced that the old version did what we want, but that's besides the point. The new version is much more readable. Thank you.
e -> e.getValue().stream().max(Long::compareTo).orElse(Long.MAX_VALUE))); | ||
|
||
for (long seqNo = 0; seqNo < operations; seqNo++) { | ||
final long finalLongSeqNo = seqNo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering about this one. Now it uses exactly the same logic as the code does - it searches for the generation whose max sequence number is below the desired one. Should we not rely on our logic but rather hard test what we want - i.e., all operations above and including the seqNo
are present if you iterated on our generation->seqNo from translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration
and upwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed afb86e8.
final Map<Long, List<Long>> generations = new HashMap<>(); | ||
|
||
for (int i = 0; i < operations; i++) { | ||
final Long seqNo = seqNos.get(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great to add duplicate seq# to this test. The real engine needs to deal with multiple versions of the same seq#, distinguished only by their associated term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 4486007.
}) | ||
.mapToLong(TranslogReader::getGeneration) | ||
.min() | ||
.orElseGet(this::currentFileGeneration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear. This is maybe worth a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx @jasontedor. I did another pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did one review and left some comments!
*/ | ||
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
final long minTranslogFileGeneration = readers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can just use a for loop and Math.min
this is much harder for me to read instead of:
long minTranslogFileGeneration = this.currentFileGeneration;
for (Reader r : readers) {
if (seqNo <= r.getCheckpoint().maxSeqId) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, r. getGeneration());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 435fe25.
* | ||
* @throws IOException if an I/O exception occurred while rolling the translog generation | ||
*/ | ||
public void prepareCommit() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that we don't use the 2phase commit interface anymore we can rename these methods IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, let's do this in a small PR that immediately follows this PR.
try (ReleasableLock ignored = writeLock.acquire()) { | ||
ensureOpen(); | ||
if (committedGeneration > current.generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would this happen? I am not sure I think we should do this. If the translog is so fragile that we can fail a commit based on a userinput I think the design is broken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should model this very similar to the IndexWriter where we have a deletion policy which can be a simple long -> long function of some sort. The InternalEngine can control this policy and by default we just use a policy that returns the identity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bleskes asked for hard failures here instead of assertions.
My rationale was that if we have a bug and we get this wrong, I rather not commit. That said - this is all very murky and I'm good with an assertion if either of you prefer it that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't like this really. lets compromise on Translog#commit(Translog.TranslogGeneration generation)
and you can add an overloaded method for tests that doesn't take any argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed e6abaf9.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we not pass in a long but rather Translog.TranslogGeneration
? this would make it less trappy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked into this suggestio. I'm not sure if it makes it less trappy (I'm also not sure if it's trappy to begin with). I do think that adding the extra indirection does make the API awkward though. A commit from the engine here is really about telling the translog to keep all generations on of after the provided generation number so it seems clearer to stay the the API being driven off the generation number? Do you feel strongly about this would be an improvement @s1monw?
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); | ||
final long finalMinReferencedGen = minReferencedGen; | ||
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); | ||
long minReferencedGen = Math.min( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did something change here that is related it's hard to tell maybe we can revert this unnecessary modification?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a small improvement here which is removing the need for the extra local variable finalMinReferencedGen
and so avoiding overwriting minReferencedGen
. I find it easier to read this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
ensureCanFlush(); | ||
try { | ||
Translog.TranslogGeneration translogGeneration = translog.getGeneration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we bring these local vars back it's easier to reason about the code when it's not inline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 5f3d2d8.
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen); | ||
final long finalMinReferencedGen = minReferencedGen; | ||
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList()); | ||
long minReferencedGen = Math.min( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
* master: (41 commits) Remove awaits fix from evil JNA native tests Correct handling of default and array settings Build: Switch jna dependency to an elastic version (elastic#24081) fix CategoryContextMappingTests compilation bugs testConcurrentGetAndSetOnPrimary - fix a race condition between indexing and updating value map Allow different data types for category in Context suggester (elastic#23491) Restrict build info loading to ES jar, not any jar (elastic#24049) Remove more hidden file leniency from plugins Register error listener in evil logger tests Detect using logging before configuration [DOCS] Added note about Elastic Cloud to improve 'elastic aws' SERP results. Add version constant for 5.5 (elastic#24075) Add unit tests for NestedAggregator (elastic#24054) Add more debugging information to rethrottles Tests: Use random analyzer only on string fields in Match/MultiMatchBuilderTests Cleanup outdated comments for fixing up pom dependencies (elastic#24056) S3 Repository: Eagerly load static settings (elastic#23910) Reject duplicate settings on the command line Wildcard cluster names for cross cluster search (elastic#23985) Update scripts/security docs for sandboxed world (elastic#23977) ...
6334cc4
to
594e9ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left some nits. no need for another cycle on my end.
SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
@Override | ||
public long generateSeqNo() { | ||
return seqNos.get(counter.getAndIncrement()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
try { | ||
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); | ||
recoveringEngine.recoverFromTranslog(); | ||
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to assert on the number of docs recovered. It's hard but I think it's good to keep a tight control of this one - it's complicated code and we need to know it keeps on doing what we expect it to.
final List<Long> shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList()); | ||
Randomness.shuffle(shuffledSeqNos); | ||
final List<Tuple<Long, Long>> seqNos = new ArrayList<>(); | ||
final Map<Long, Long> terms = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this highestTermPerSeqNo?
LGTM |
Today when a flush is performed, the translog is committed and if there are no outstanding views, only the current translog generation is preserved. Yet for the purpose of sequence numbers, we need stronger guarantees than this. This commit migrates the preservation of translog generations to keep the minimum generation that would be needed to recover after the local checkpoint.
Relates #10708