Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Optimize indexing ops using seq_no on followers #34099

Merged
merged 14 commits into from
Sep 29, 2018
26 changes: 26 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,32 @@ public interface TranslogRecoveryRunner {
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
* <p>
* A note on the optimization using max_seq_no_of_updates_or_deletes:
* For each operation O, the key invariants are:
* <ol>
* <li> I1. There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O) </li>
* <li> I2. If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O'
* * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete. </li>
* </ol>
* <p>
* When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU is >= MSU(O),
* and then compares its MSU to its local checkpoint (LCP). If LCP < MSU then there’s a gap: there may be some operations that act
* on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future operation O'
* with seqNo(O') > seqNo(O) and docId(O') = docID(O) is processed before O. In that case MSU(O') is at least seqNo(O') and this
* means MSU >= seqNo(O') > seqNo(O) > LCP (because O wasn't processed yet).
* <p>
* However, if MSU <= LCP then there is no gap: we have processed every operation <= LCP, and no operation O' with seqno(O') > LCP
* and seqno(O') < seqno(O) also has docID(O') = docID(O), because such an operation would have seqno(O') > LCP >= MSU >= MSU(O)
* which contradicts the first invariant. Furthermore in this case we immediately know that docID(O) has been deleted
* (or never existed) without needing to check Lucene for the following reason. If there's no earlier operation on docID(O) then
* this is clear, so suppose instead that the preceding operation on docID(O) is O':
* 1. The first invariant above tells us that seqno(O') <= MSU(O) <= LCP so we have already applied O' to Lucene.
* 2. Also MSU(O) <= MSU <= LCP < seqno(O) (we discard O if seqno(O) ≤ LCP) so the second invariant applies,
* meaning that the O' was a delete.
* <p>
* Moreover, operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a sentence - "Therefore, if MSU<= LCP < seqno(O) we know that O can safely be optimized with and added to lucene with addDocument. Moreover, operations"...

* in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,36 +54,7 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
/*
* A note about optimization using sequence numbers:
*
* 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed
* one by one under the docID lock.
*
* 2. Operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates
* in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed.
*
* 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the
* leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the
* MUS on the leader when it was executed [every operation O => MSU_r(O) >= MSU_p(O)].
*
* 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O):
*
* 4.1) If such operation O' on the follower with docID(O’) = docID(O) and LCP < seqno(O) < seqno(O’) then MUS_p(O') >= seqno(O')
* because O' is an update or MUS_p(O') >= seqno(delete O") with docID(O") = docID(O) and seqno(O) < seqno(O") < seqno(O').
* MSU_r(O) >= MSU_r(O') because O' was delivered to the follower before O, and MSU_p(O') > seqno(O) > LCP in both cases.
* Thus MUS_r(O) >= MSU_r(O') >= MSU_p(O') > seqno(O) > LCP which contradicts the assumption [MSU_r(O) <= LCP]
*
* 4.2) If such operation O' in the history with docID(O’) = docID(O), and LCP < seqno(O') < seqno(O) then MSU_p(O) >= seqno(O)
* because O is an update or MSU_p(O) >= seqno(delete O") with docID(O") = docID(O) and seqno(O') < seqno(O") < seqno(O).
* Thus MSU_r(O) >= MSU_p(O) > seqno(O') > LCP in both cases which contradicts the assumption [MSU_r(O) <= LCP].
*
* 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist
* after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every
* operation with seqno <= LCP, and there is no such O' in the history with docID(O’) = docID(O) and LCP < seqno(O’)[4.2].
* These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O).
* Thus docID(O) does not exist on the follower.
*/
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
Expand Down