-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CCR: Replicate existing ops with old term on follower #34412
Conversation
Since , we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to delay the write requests if there is a gap in the write-buffer. With this change, if a writer is waiting for seq_no N, then all the operations below N were delivered or were scheduled to deliver by other writers.
Pinging @elastic/es-distributed |
Another approach is to let the following primary wait for the advancement of the global checkpoint only if its local checkpoint is at least the waiting_for_global checkpoint. Otherwise, it will return the unapplied operations to the FollowTask without waiting. In the latter case, the FollowTask puts back the unapplied operations to the buffer, then deliver the head (the current behavior) of the buffer (i.e., operations before the waiting_for_gcp). |
* Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the | ||
* given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. | ||
*/ | ||
public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jimczi Could you please have a look at this Lucene code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this wraps all docs live I am not sure it should life here in this class?
@bleskes This is ready. Could you please give it shot? |
int docId; | ||
while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { | ||
// make sure to skip the non-root nested documents | ||
if (primaryTermDV.advanceExact(docId - leaf.docBase) && primaryTermDV.longValue() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docIdSetIterator
returns the leaf doc id so you can use it directly to advance the primaryTermDV
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I pushed 6bf3f1d.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments
* Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the | ||
* given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. | ||
*/ | ||
public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this wraps all docs live I am not sure it should life here in this class?
final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); | ||
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); | ||
// iterate backwards since the existing operation is likely in the most recent segments. | ||
for (int i = reader.leaves().size() - 1; i >= 0; i--) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this optimization really relevant here? I wonder if we can't just do an ordinary search and then lookup the leaf reader based on the top hits?
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { | ||
return OptionalLong.empty(); | ||
} else { | ||
final long term = VersionsAndSeqNoResolver.lookupPrimaryTerm(searcher.getDirectoryReader(), seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just put lookupPrimaryTerm
in this class.
@s1monw I have addressed your comments. Could you please have another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@bleskes should also look at this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. I left some comments. The important one is about the testing.
if (failure.getExistingPrimaryTerm().isPresent()) { | ||
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); | ||
} else { | ||
assert targetOp.seqNo() <= primary.getGlobalCheckpoint() : targetOp.seqNo() + " > " + primary.getGlobalCheckpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also throw an exception so replication stops and we'll know about it (assertion is fine for testing too).
.../java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java
Show resolved
Hide resolved
return OptionalLong.of(primaryTermDV.longValue()); | ||
} | ||
} | ||
assert false : "seq_no[" + seqNo + "] does not have primary_term"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we show how many docs we found? this assumes 0 but it might be 2.
} | ||
// Primary should reject duplicates | ||
final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); | ||
for (Engine.Operation op : operations) { | ||
Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); | ||
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); | ||
assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); | ||
AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); | ||
assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to mean we never deliver ops below the global checkpoint (and flush etc.) can we extend the test to that too?
@bleskes I've addressed your comments. Would you please take another look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR adds an assertion which asserts that the existing operation is equal to the processing operation except for the primary term. Relates elastic#34412
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Since elastic#34412 and elastic#34474, a follower must have soft-deletes enabled to work correctly. This change requires soft-deletes on the follower. Relates elastic#34412 Relates elastic#34474
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario:
Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer.
The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively.
The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1.
The primary of a follower fails after it has replicated seq#1 to replicas.
Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).
The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1.
The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed.
This PR proposes to delay the write requests if there is a gap in the write-buffer. With this change, if a writer is waiting for seq_no N, then all the operations below N were delivered or were scheduled to deliver by other writers.This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. I will make a follow-up to enforce the soft-deletes on the follower.
Relates #34288