From 7f8737aa8b398e1f1e202ff8e666db6bc5a67222 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Oct 2018 20:31:58 -0400 Subject: [PATCH 1/9] CCR: Delay write requests if gaps in write buffer Since , we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to delay the write requests if there is a gap in the write-buffer. With this change, if a writer is waiting for seq_no N, then all the operations below N were delivered or were scheduled to deliver by other writers. --- .../xpack/ccr/action/ShardFollowNodeTask.java | 20 +++++++++++- .../xpack/ccr/ShardChangesIT.java | 3 ++ .../ccr/action/ShardFollowNodeTaskTests.java | 32 +++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index a10ee10f22a7a..a6b1755e23e74 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -46,6 +46,7 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -67,6 +68,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long leaderMaxSeqNo; private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; private long lastRequestedSeqNo; + private long lastWrittenSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; private long followerGlobalCheckpoint = 0; private long followerMaxSeqNo = 0; private int numConcurrentReads = 0; @@ -127,6 +129,7 @@ void start( this.followerGlobalCheckpoint = followerGlobalCheckpoint; this.followerMaxSeqNo = followerMaxSeqNo; this.lastRequestedSeqNo = followerGlobalCheckpoint; + this.lastWrittenSeqNo = followerGlobalCheckpoint; } // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical @@ -198,18 +201,28 @@ private synchronized void coordinateWrites() { return; } - while (hasWriteBudget() && buffer.isEmpty() == false) { + final Supplier hasNextSeqNoToWriteInBuffer = () -> { + assert lastWrittenSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO; + return buffer.isEmpty() == false && buffer.peek().seqNo() <= lastWrittenSeqNo + 1; + }; + + while (hasWriteBudget() && hasNextSeqNoToWriteInBuffer.get()) { long sumEstimatedSize = 0L; int length = Math.min(params.getMaxBatchOperationCount(), buffer.size()); List ops = new ArrayList<>(length); for (int i = 0; i < length; i++) { + if (hasNextSeqNoToWriteInBuffer.get() == false) { + break; + } Translog.Operation op = buffer.remove(); ops.add(op); + lastWrittenSeqNo = op.seqNo(); sumEstimatedSize += op.estimateSize(); if (sumEstimatedSize > params.getMaxBatchSize().getBytes()) { break; } } + assert ops.isEmpty() == false; numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); @@ -430,6 +443,11 @@ public ShardId getFollowShardId() { return params.getFollowShardId(); } + final synchronized long getLastWrittenSeqNo() { + // TODO: Remove this method and fold it to ShardFollowNodeTaskStatus (I'll do it in a follow-up). + return lastWrittenSeqNo; + } + @Override public synchronized ShardFollowNodeTaskStatus getStatus() { final long timeSinceLastFetchMillis; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 639e2b8d0eb6f..d49c4d2b88b43 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -722,6 +722,9 @@ public void testFailOverOnFollower() throws Exception { threads[i].start(); } PutFollowAction.Request follow = follow("leader-index", "follower-index"); + follow.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(1, 1000)); + follow.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(1, 10)); + follow.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(1, 10)); client().execute(PutFollowAction.INSTANCE, follow).get(); ensureGreen("follower-index"); atLeastDocsIndexed("follower-index", between(20, 60)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index e772516e33128..c735f45443451 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -544,6 +544,7 @@ public void testCoordinateWrites() { public void testMaxConcurrentWrites() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 256L, -1L); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -556,6 +557,7 @@ public void testMaxConcurrentWrites() { assertThat(status.numberOfConcurrentWrites(), equalTo(2)); task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 256L, -1L); response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -572,6 +574,7 @@ public void testMaxConcurrentWrites() { public void testMaxBatchOperationCount() { ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE); + startTask(task, 256L, -1L); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -696,6 +699,35 @@ public void testComputeDelay() { assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); } + public void testDelayWriteRequestIfGapInBuffer() { + ShardFollowNodeTask task = createShardFollowTask(10, 4, 2, between(1, 1000), Long.MAX_VALUE); + long lastWrittenSeqNo = randomLongBetween(-1L, 5L); + startTask(task, 5L, lastWrittenSeqNo); + + task.innerHandleReadResponse(10, 19, generateShardChangesResponse(10, 19, 0, 256)); + assertThat(bulkShardOperationRequests, hasSize(0)); + assertThat(task.getLastWrittenSeqNo(), equalTo(lastWrittenSeqNo)); + + task.innerHandleReadResponse(30, 49, generateShardChangesResponse(30, 49, 0, 256)); + assertThat(bulkShardOperationRequests, hasSize(0)); + assertThat(task.getLastWrittenSeqNo(), equalTo(lastWrittenSeqNo)); + + followerGlobalCheckpoints.add(-1L); + followerGlobalCheckpoints.add(-1L); + task.innerHandleReadResponse(lastWrittenSeqNo + 1, 9, generateShardChangesResponse(lastWrittenSeqNo + 1, 9, 0, 256)); + assertThat(bulkShardOperationRequests, hasSize(2)); + assertThat(task.getLastWrittenSeqNo(), equalTo(19L)); + + followerGlobalCheckpoints.add(-1L); + task.innerHandleReadResponse(20, 24, generateShardChangesResponse(20, 24, 0, 256)); + assertThat(task.getLastWrittenSeqNo(), equalTo(24L)); + + followerGlobalCheckpoints.add(-1L); + followerGlobalCheckpoints.add(-1L); + task.innerHandleReadResponse(25, 30, generateShardChangesResponse(25, 30, 0, 256)); + assertThat(task.getLastWrittenSeqNo(), equalTo(49L)); + } + private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, From 7d30491bfdd790ffbf8f668c649b1c61373cc8eb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 11 Oct 2018 23:49:08 -0400 Subject: [PATCH 2/9] use loose form --- .../org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index a6b1755e23e74..d7d15145174cd 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -216,7 +216,7 @@ private synchronized void coordinateWrites() { } Translog.Operation op = buffer.remove(); ops.add(op); - lastWrittenSeqNo = op.seqNo(); + lastWrittenSeqNo = Math.max(lastWrittenSeqNo, op.seqNo()); sumEstimatedSize += op.estimateSize(); if (sumEstimatedSize > params.getMaxBatchSize().getBytes()) { break; From b2573251ec0b77ffa04a41163f3dffe05e8422dc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Oct 2018 08:24:29 -0400 Subject: [PATCH 3/9] backout delay writes --- .../xpack/ccr/action/ShardFollowNodeTask.java | 20 +----------- .../xpack/ccr/ShardChangesIT.java | 5 ++- .../ccr/action/ShardFollowNodeTaskTests.java | 32 ------------------- 3 files changed, 3 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index d7d15145174cd..a10ee10f22a7a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -46,7 +46,6 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; import java.util.function.LongSupplier; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -68,7 +67,6 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long leaderMaxSeqNo; private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; private long lastRequestedSeqNo; - private long lastWrittenSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; private long followerGlobalCheckpoint = 0; private long followerMaxSeqNo = 0; private int numConcurrentReads = 0; @@ -129,7 +127,6 @@ void start( this.followerGlobalCheckpoint = followerGlobalCheckpoint; this.followerMaxSeqNo = followerMaxSeqNo; this.lastRequestedSeqNo = followerGlobalCheckpoint; - this.lastWrittenSeqNo = followerGlobalCheckpoint; } // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical @@ -201,28 +198,18 @@ private synchronized void coordinateWrites() { return; } - final Supplier hasNextSeqNoToWriteInBuffer = () -> { - assert lastWrittenSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO; - return buffer.isEmpty() == false && buffer.peek().seqNo() <= lastWrittenSeqNo + 1; - }; - - while (hasWriteBudget() && hasNextSeqNoToWriteInBuffer.get()) { + while (hasWriteBudget() && buffer.isEmpty() == false) { long sumEstimatedSize = 0L; int length = Math.min(params.getMaxBatchOperationCount(), buffer.size()); List ops = new ArrayList<>(length); for (int i = 0; i < length; i++) { - if (hasNextSeqNoToWriteInBuffer.get() == false) { - break; - } Translog.Operation op = buffer.remove(); ops.add(op); - lastWrittenSeqNo = Math.max(lastWrittenSeqNo, op.seqNo()); sumEstimatedSize += op.estimateSize(); if (sumEstimatedSize > params.getMaxBatchSize().getBytes()) { break; } } - assert ops.isEmpty() == false; numConcurrentWrites++; LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(), ops.get(ops.size() - 1).seqNo(), ops.size()); @@ -443,11 +430,6 @@ public ShardId getFollowShardId() { return params.getFollowShardId(); } - final synchronized long getLastWrittenSeqNo() { - // TODO: Remove this method and fold it to ShardFollowNodeTaskStatus (I'll do it in a follow-up). - return lastWrittenSeqNo; - } - @Override public synchronized ShardFollowNodeTaskStatus getStatus() { final long timeSinceLastFetchMillis; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index d49c4d2b88b43..134c4a3141563 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -373,6 +373,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFollowIndexAndCloseNode() throws Exception { internalCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -693,6 +694,7 @@ public void testUnfollowIndex() throws Exception { assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFailOverOnFollower() throws Exception { int numberOfReplicas = between(1, 2); internalCluster().startMasterOnlyNode(); @@ -722,9 +724,6 @@ public void testFailOverOnFollower() throws Exception { threads[i].start(); } PutFollowAction.Request follow = follow("leader-index", "follower-index"); - follow.getFollowRequest().setMaxBatchOperationCount(randomIntBetween(1, 1000)); - follow.getFollowRequest().setMaxConcurrentReadBatches(randomIntBetween(1, 10)); - follow.getFollowRequest().setMaxConcurrentWriteBatches(randomIntBetween(1, 10)); client().execute(PutFollowAction.INSTANCE, follow).get(); ensureGreen("follower-index"); atLeastDocsIndexed("follower-index", between(20, 60)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index c735f45443451..e772516e33128 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -544,7 +544,6 @@ public void testCoordinateWrites() { public void testMaxConcurrentWrites() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 2, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 256L, -1L); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -557,7 +556,6 @@ public void testMaxConcurrentWrites() { assertThat(status.numberOfConcurrentWrites(), equalTo(2)); task = createShardFollowTask(64, 1, 4, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 256L, -1L); response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -574,7 +572,6 @@ public void testMaxConcurrentWrites() { public void testMaxBatchOperationCount() { ShardFollowNodeTask task = createShardFollowTask(8, 1, 32, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 256L, -1L); ShardChangesAction.Response response = generateShardChangesResponse(0, 256, 0L, 256L); // Also invokes coordinatesWrites() task.innerHandleReadResponse(0L, 64L, response); @@ -699,35 +696,6 @@ public void testComputeDelay() { assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); } - public void testDelayWriteRequestIfGapInBuffer() { - ShardFollowNodeTask task = createShardFollowTask(10, 4, 2, between(1, 1000), Long.MAX_VALUE); - long lastWrittenSeqNo = randomLongBetween(-1L, 5L); - startTask(task, 5L, lastWrittenSeqNo); - - task.innerHandleReadResponse(10, 19, generateShardChangesResponse(10, 19, 0, 256)); - assertThat(bulkShardOperationRequests, hasSize(0)); - assertThat(task.getLastWrittenSeqNo(), equalTo(lastWrittenSeqNo)); - - task.innerHandleReadResponse(30, 49, generateShardChangesResponse(30, 49, 0, 256)); - assertThat(bulkShardOperationRequests, hasSize(0)); - assertThat(task.getLastWrittenSeqNo(), equalTo(lastWrittenSeqNo)); - - followerGlobalCheckpoints.add(-1L); - followerGlobalCheckpoints.add(-1L); - task.innerHandleReadResponse(lastWrittenSeqNo + 1, 9, generateShardChangesResponse(lastWrittenSeqNo + 1, 9, 0, 256)); - assertThat(bulkShardOperationRequests, hasSize(2)); - assertThat(task.getLastWrittenSeqNo(), equalTo(19L)); - - followerGlobalCheckpoints.add(-1L); - task.innerHandleReadResponse(20, 24, generateShardChangesResponse(20, 24, 0, 256)); - assertThat(task.getLastWrittenSeqNo(), equalTo(24L)); - - followerGlobalCheckpoints.add(-1L); - followerGlobalCheckpoints.add(-1L); - task.innerHandleReadResponse(25, 30, generateShardChangesResponse(25, 30, 0, 256)); - assertThat(task.getLastWrittenSeqNo(), equalTo(49L)); - } - private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, From 19be4d753241d411594ff70d66e24d7efd6b2c32 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 15 Oct 2018 09:55:34 -0400 Subject: [PATCH 4/9] Replicate existing ops with old term on follower --- .../index/engine/InternalEngine.java | 13 +- .../TransportBulkShardOperationsAction.java | 114 +++++++-------- ...eadyProcessedFollowingEngineException.java | 19 ++- .../ccr/index/engine/FollowingEngine.java | 74 +++++++++- .../ShardFollowTaskReplicationTests.java | 5 +- .../action/bulk/BulkShardOperationsTests.java | 137 ++++++------------ .../index/engine/FollowingEngineTests.java | 16 +- 7 files changed, 201 insertions(+), 177 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 55d93203abeea..cd5e7f5b1062b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2410,9 +2410,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { // TODO: Should we defer the refresh until we really need it? ensureOpen(); - if (lastRefreshedCheckpoint() < toSeqNo) { - refresh(source, SearcherScope.INTERNAL); - } + refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( @@ -2522,6 +2520,15 @@ final long lastRefreshedCheckpoint() { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + /** + * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. + */ + protected final void refreshIfNeeded(String source, long requestingSeqNo) { + if (lastRefreshedCheckpoint() < requestingSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + } + private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { final AtomicLong refreshedCheckpoint; private long pendingCheckpoint; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 6d5df143eeae0..71bc165bf3afa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Function; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -68,6 +67,41 @@ protected WritePrimaryResult rewriteWithTerm = operation -> { - final Translog.Operation operationWithPrimaryTerm; - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - operationWithPrimaryTerm = new Translog.Index( - index.type(), - index.id(), - index.seqNo(), - primary.getOperationPrimaryTerm(), - index.version(), - BytesReference.toBytes(index.source()), - index.routing(), - index.getAutoGeneratedIdTimestamp()); - break; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - operationWithPrimaryTerm = new Translog.Delete( - delete.type(), - delete.id(), - delete.uid(), - delete.seqNo(), - primary.getOperationPrimaryTerm(), - delete.version()); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason()); - break; - default: - throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); - } - return operationWithPrimaryTerm; - }; - assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]"; primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); final List appliedOperations = new ArrayList<>(sourceOperations.size()); Translog.Location location = null; - long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; for (Translog.Operation sourceOp : sourceOperations) { - final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp); + final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm()); final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assert result.getSeqNo() == targetOp.seqNo(); @@ -131,23 +129,25 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( location = locationToSync(location, result.getTranslogLocation()); } else { if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) { - // Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery. - // The primary must not acknowledge this request until the global checkpoint is at least the highest - // seqno of all skipped operations (i.e., all skipped operations have been processed on every replica). - waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo()); + // The existing operations below the global checkpoint won't be replicated as they all were processed + // in every replicas already. However, the existing operations after the global checkpoint will be + // replicated to replicas but with the existing primary term (not the current primary term) in order + // to guarantee the consistency between the primary and replicas, and between translog and Lucene index. + final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + if (failure.getExistingPrimaryTerm().isPresent()) { + appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); + } else { + assert targetOp.seqNo() <= primary.getGlobalCheckpoint() : targetOp.seqNo() + " > " + primary.getGlobalCheckpoint(); + } } else { assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]"; throw ExceptionsHelper.convertToElastic(result.getFailure()); } } } - assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : - "waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint + - " source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size(); - assert appliedOperations.size() == 0 || location != null; final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes); - return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger); + return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } @Override @@ -184,12 +184,8 @@ protected BulkShardOperationsResponse newResponseInstance() { * Custom write result to include global checkpoint after ops have been replicated. */ static final class CcrWritePrimaryResult extends WritePrimaryResult { - final long waitingForGlobalCheckpoint; - - CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, - long waitingForGlobalCheckpoint, Logger logger) { + CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) { super(request, new BulkShardOperationsResponse(), location, null, primary, logger); - this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint; } @Override @@ -201,19 +197,7 @@ public synchronized void respond(ActionListener lis response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); listener.onResponse(response); }, listener::onFailure); - - if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) { - primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> { - if (e != null) { - listener.onFailure(e); - } else { - assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp; - super.respond(wrappedListener); - } - }, null); - } else { - super.respond(wrappedListener); - } + super.respond(wrappedListener); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java index 9e19c93b2867a..d19bd4b63628e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java @@ -9,8 +9,23 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; +import java.util.OptionalLong; + public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException { - AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) { - super(shardId, "operation [{}] was processed before", null, seqNo); + private final long seqNo; + private final OptionalLong existingPrimaryTerm; + + AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo, OptionalLong existingPrimaryTerm) { + super(shardId, "operation [{}] was processed before with term [{}]", null, seqNo, existingPrimaryTerm); + this.seqNo = seqNo; + this.existingPrimaryTerm = existingPrimaryTerm; + } + + public long getSeqNo() { + return seqNo; + } + + public OptionalLong getExistingPrimaryTerm() { + return existingPrimaryTerm; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8a413ce498066..03846cd09cc94 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,14 +5,27 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.OptionalLong; /** * An engine implementation for following shards. @@ -62,13 +75,13 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind /* * The existing operation in this engine was probably assigned the term of the previous primary shard which is different * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the - * existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync, - * we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException. - * The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that - * the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), findExistingPrimaryTerm(index.seqNo())); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -88,7 +101,8 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del preFlight(delete); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, delete.seqNo(), findExistingPrimaryTerm(delete.seqNo())); return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false); } else { return planDeletionAsNonPrimary(delete); @@ -126,6 +140,52 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { return true; } + private OptionalLong findExistingPrimaryTerm(final long seqNo) throws IOException { + refreshIfNeeded("find_primary_term", seqNo); + try (Searcher engineSearcher = acquireSearcher("find_primary_term", SearcherScope.INTERNAL)) { + // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else + // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the + // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already. + if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { + return OptionalLong.empty(); + } else { + final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + // iterate backwards since the existing operation is likely in the most recent segments. + for (int i = reader.leaves().size() - 1; i >= 0; i--) { + final LeafReaderContext leafContext = reader.leaves().get(i); + final Scorer scorer = weight.scorer(leafContext); + if (scorer == null) { + continue; + } + final NumericDocValues primaryTermDV = leafContext.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDV == null) { + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); + } + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + int docId; + while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + // make sure to skip non-root nested documents + if (primaryTermDV.advanceExact(docId - leafContext.docBase) && primaryTermDV.longValue() > 0) { + return OptionalLong.of(primaryTermDV.longValue()); + } + } + } + throw new IllegalStateException("seq_no[" + seqNo + "] is not retained"); + } + } catch (IOException e) { + try { + maybeFailEngine("find_primary_term", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + } + /** * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. * This metric is not persisted, and started from 0 when the engine is opened. diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 5539bc6ae4764..1b0270a54a501 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -227,7 +227,6 @@ public void testChangeFollowerHistoryUUID() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testRetryBulkShardOperations() throws Exception { try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { @@ -304,7 +303,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { private ReplicationGroup createFollowGroup(int replicas) throws IOException { Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)); return createGroup(replicas, settingsBuilder.build()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index dfacb96c31cca..283cf6bf42c36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -7,18 +7,18 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.lucene.index.Term; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -29,6 +29,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm; import static org.hamcrest.Matchers.equalTo; import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult; @@ -87,60 +90,11 @@ public void testPrimaryTermFromFollower() throws IOException { closeShards(followerPrimary); } - public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory()); - int numOps = between(1, 100); - for (int i = 0; i < numOps; i++) { - final String id = Integer.toString(i); - final Translog.Operation op; - if (randomBoolean()) { - op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1); - } else if (randomBoolean()) { - shard.advanceMaxSeqNoOfUpdatesOrDeletes(i); - op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0); - } else { - op = new Translog.NoOp(i, primaryTerm, "test"); - } - shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA); - } - BulkShardOperationsRequest request = new BulkShardOperationsRequest(); - { - PlainActionFuture listener = new PlainActionFuture<>(); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger); - primaryResult.respond(listener); - assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true)); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.isDone(), equalTo(false)); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test"); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test"); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - closeShards(shard); - } - public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory()); + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexShard oldPrimary = newStartedShard(true, settings, new FollowingEngineFactory()); + final long oldPrimaryTerm = oldPrimary.getOperationPrimaryTerm(); long seqno = 0; List firstBulk = new ArrayList<>(); List secondBulk = new ArrayList<>(); @@ -157,46 +111,41 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { } else { secondBulk.add(op); } + if (rarely()) { + oldPrimary.refresh("test"); + } + if (rarely()) { + oldPrimary.flush(new FlushRequest()); + } } Randomness.shuffle(firstBulk); Randomness.shuffle(secondBulk); - primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); - - final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), firstBulk, seqno, primary, logger); + oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(), + oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger); assertThat(fullResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm()))); - assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L)); - - // This bulk includes some operations from the first bulk. These operations should not be included in the result. + equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList()))); + primaryTerm = randomLongBetween(primaryTerm, primaryTerm + 10); + final IndexShard newPrimary = reinitShard(oldPrimary); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null)); + assertTrue(newPrimary.recoverFromStore()); + IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted()); + newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + // The second bulk includes some operations from the first bulk which were processed already; + // only a subset of these operations will be included the result but with the old primary term. final List existingOps = randomSubsetOf(firstBulk); - final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()), - seqno, primary, logger); - assertThat(partialResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm()))); - assertThat(partialResult.waitingForGlobalCheckpoint, - equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L))); - - closeShards(primary); - } - - private List rewriteWithPrimaryTerm(List sourceOperations, long primaryTerm) { - return sourceOperations.stream().map(op -> { - switch (op.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) op; - return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm, - index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp()); - case DELETE: - final Translog.Delete delete = (Translog.Delete) op; - return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version()); - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) op; - return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason()); - default: - throw new IllegalStateException("unexpected operation type [" + op.opType() + "]"); - } - }).collect(Collectors.toList()); + final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(), + newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()), + seqno, newPrimary, logger); + final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm(); + final long globalCheckpoint = newPrimary.getGlobalCheckpoint(); + final List appliedOperations = Stream.concat( + secondBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, newPrimaryTerm)), + existingOps.stream().filter(op -> op.seqNo() > globalCheckpoint).map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)) + ).collect(Collectors.toList()); + + assertThat(partialResult.replicaRequest().getOperations(), equalTo(appliedOperations)); + closeShards(newPrimary); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index ec59e4c5b1d31..4bb92c32db9e5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -46,8 +46,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -555,7 +557,8 @@ public void close() throws IOException { public void testProcessOnceOnPrimary() throws Exception { final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build(); + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); int numOps = between(10, 100); @@ -576,9 +579,12 @@ public void testProcessOnceOnPrimary() throws Exception { try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + final Map operationWithTerms = new HashMap<>(); for (Engine.Operation op : operations) { - Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values())); + long term = randomLongBetween(1, oldTerm); + Engine.Result result = applyOperation(followingEngine, op, term, randomFrom(Engine.Operation.Origin.values())); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + operationWithTerms.put(op.seqNo(), term); } // Primary should reject duplicates final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); @@ -586,9 +592,11 @@ public void testProcessOnceOnPrimary() throws Exception { Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); + AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } // Replica should accept duplicates primaryTerm.set(newTerm); @@ -600,7 +608,7 @@ public void testProcessOnceOnPrimary() throws Exception { assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } } } From 9e75f5d161b9e491c7e572a2ec6fea80e9c7f6ec Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 16 Oct 2018 11:29:17 -0400 Subject: [PATCH 5/9] unmute tests --- .../test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 42dcbc8a328d3..3be0c31bce9cf 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -270,7 +270,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -618,7 +617,6 @@ public void testUnfollowIndex() throws Exception { assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFailOverOnFollower() throws Exception { int numberOfReplicas = between(1, 2); getFollowerCluster().startMasterOnlyNode(); From 58c0686354b5ddc03eb6a3f4d43cadaa32dc51c1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 16 Oct 2018 11:37:13 -0400 Subject: [PATCH 6/9] move to seq_no resolver --- .../lucene/uid/VersionsAndSeqNoResolver.java | 44 +++++++++++++++ .../TransportBulkShardOperationsAction.java | 5 +- .../ccr/index/engine/FollowingEngine.java | 54 ++++--------------- 3 files changed, 57 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 9db7e3716d51a..1a693870f92fb 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -19,12 +19,21 @@ package org.elasticsearch.common.lucene.uid; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.CloseableThreadLocal; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -193,4 +202,39 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } + + /** + * Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the + * given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. + */ + public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { + final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); + final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); + // iterate backwards since the existing operation is likely in the most recent segments. + for (int i = reader.leaves().size() - 1; i >= 0; i--) { + final LeafReaderContext leaf = reader.leaves().get(i); + final Scorer scorer = weight.scorer(leaf); + if (scorer == null) { + continue; + } + final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDV == null) { + assert false : "seq_no[" + seqNo + "] does not have primary_term"; + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); + } + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + int docId; + while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + // make sure to skip the non-root nested documents + if (primaryTermDV.advanceExact(docId - leaf.docBase) && primaryTermDV.longValue() > 0) { + return primaryTermDV.longValue(); + } + } + } + assert false : "primary term for seq_no[" + seqNo + "] is not found"; + throw new IllegalStateException("primary term for seq_no[" + seqNo + "] is not found"); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 71bc165bf3afa..15f8c6cf97f5a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -129,11 +129,12 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( location = locationToSync(location, result.getTranslogLocation()); } else { if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) { - // The existing operations below the global checkpoint won't be replicated as they all were processed - // in every replicas already. However, the existing operations after the global checkpoint will be + // The existing operations below the global checkpoint won't be replicated as they were processed + // in every replicas already. However, the existing operations above the global checkpoint will be // replicated to replicas but with the existing primary term (not the current primary term) in order // to guarantee the consistency between the primary and replicas, and between translog and Lucene index. final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); if (failure.getExistingPrimaryTerm().isPresent()) { appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 03846cd09cc94..2ff893128d665 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,22 +5,11 @@ */ package org.elasticsearch.xpack.ccr.index.engine; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; -import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; @@ -81,7 +70,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). */ final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( - shardId, index.seqNo(), findExistingPrimaryTerm(index.seqNo())); + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -102,7 +91,7 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( - shardId, delete.seqNo(), findExistingPrimaryTerm(delete.seqNo())); + shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo())); return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false); } else { return planDeletionAsNonPrimary(delete); @@ -140,45 +129,22 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { return true; } - private OptionalLong findExistingPrimaryTerm(final long seqNo) throws IOException { - refreshIfNeeded("find_primary_term", seqNo); - try (Searcher engineSearcher = acquireSearcher("find_primary_term", SearcherScope.INTERNAL)) { + private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { + refreshIfNeeded("lookup_primary_term", seqNo); + try (Searcher searcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) { // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already. if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { return OptionalLong.empty(); } else { - final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); - final IndexSearcher searcher = new IndexSearcher(reader); - searcher.setQueryCache(null); - final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); - // iterate backwards since the existing operation is likely in the most recent segments. - for (int i = reader.leaves().size() - 1; i >= 0; i--) { - final LeafReaderContext leafContext = reader.leaves().get(i); - final Scorer scorer = weight.scorer(leafContext); - if (scorer == null) { - continue; - } - final NumericDocValues primaryTermDV = leafContext.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - if (primaryTermDV == null) { - throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); - } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - // make sure to skip non-root nested documents - if (primaryTermDV.advanceExact(docId - leafContext.docBase) && primaryTermDV.longValue() > 0) { - return OptionalLong.of(primaryTermDV.longValue()); - } - } - } - throw new IllegalStateException("seq_no[" + seqNo + "] is not retained"); + final long term = VersionsAndSeqNoResolver.lookupPrimaryTerm(searcher.getDirectoryReader(), seqNo); + assert term > 0L : "seq_no=" + seqNo + " term=" + term; + return OptionalLong.of(term); } } catch (IOException e) { try { - maybeFailEngine("find_primary_term", e); + maybeFailEngine("lookup_primary_term", e); } catch (Exception inner) { e.addSuppressed(inner); } From 6bf3f1de97b89bdf59cf833faacfc95f2b59556e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 16 Oct 2018 14:44:29 -0400 Subject: [PATCH 7/9] use docId directly --- .../common/lucene/uid/VersionsAndSeqNoResolver.java | 2 +- .../xpack/ccr/index/engine/FollowingEngineTests.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 1a693870f92fb..057c50d406685 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -229,7 +229,7 @@ public static long lookupPrimaryTerm(final DirectoryReader directoryReader, fina int docId; while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { // make sure to skip the non-root nested documents - if (primaryTermDV.advanceExact(docId - leaf.docBase) && primaryTermDV.longValue() > 0) { + if (primaryTermDV.advanceExact(docId) && primaryTermDV.longValue() > 0) { return primaryTermDV.longValue(); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 4bb92c32db9e5..6d63d06d4993c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -574,17 +574,21 @@ public void testProcessOnceOnPrimary() throws Exception { } } Randomness.shuffle(operations); + final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + primaryTerm.set(oldTerm); try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); - final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); final Map operationWithTerms = new HashMap<>(); for (Engine.Operation op : operations) { long term = randomLongBetween(1, oldTerm); Engine.Result result = applyOperation(followingEngine, op, term, randomFrom(Engine.Operation.Origin.values())); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); operationWithTerms.put(op.seqNo(), term); + if (rarely()) { + followingEngine.refresh("test"); + } } // Primary should reject duplicates final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); From 2a45bf277968fbc702efd451564acc2a13cef03e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 17 Oct 2018 09:42:49 -0400 Subject: [PATCH 8/9] move it back to FollowingEngine --- .../lucene/uid/VersionsAndSeqNoResolver.java | 44 ------------------- .../index/engine/EngineTestCase.java | 28 ++++++++++++ .../ccr/index/engine/FollowingEngine.java | 39 +++++++++++++--- .../index/engine/FollowingEngineTests.java | 5 ++- 4 files changed, 66 insertions(+), 50 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 057c50d406685..9db7e3716d51a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -19,21 +19,12 @@ package org.elasticsearch.common.lucene.uid; -import org.apache.lucene.document.LongPoint; -import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Query; -import org.apache.lucene.search.ScoreMode; -import org.apache.lucene.search.Scorer; -import org.apache.lucene.search.Weight; import org.apache.lucene.util.CloseableThreadLocal; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -202,39 +193,4 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } - - /** - * Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the - * given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. - */ - public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { - final DirectoryReader reader = Lucene.wrapAllDocsLive(directoryReader); - final IndexSearcher searcher = new IndexSearcher(reader); - searcher.setQueryCache(null); - final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); - final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); - // iterate backwards since the existing operation is likely in the most recent segments. - for (int i = reader.leaves().size() - 1; i >= 0; i--) { - final LeafReaderContext leaf = reader.leaves().get(i); - final Scorer scorer = weight.scorer(leaf); - if (scorer == null) { - continue; - } - final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - if (primaryTermDV == null) { - assert false : "seq_no[" + seqNo + "] does not have primary_term"; - throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); - } - final DocIdSetIterator docIdSetIterator = scorer.iterator(); - int docId; - while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - // make sure to skip the non-root nested documents - if (primaryTermDV.advanceExact(docId) && primaryTermDV.longValue() > 0) { - return primaryTermDV.longValue(); - } - } - } - assert false : "primary term for seq_no[" + seqNo + "] is not found"; - throw new IllegalStateException("primary term for seq_no[" + seqNo + "] is not found"); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bb1efd6997393..3e563e6d5382e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,15 +49,20 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; @@ -65,6 +70,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -72,6 +78,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -307,6 +314,27 @@ protected static ParsedDocument testParsedDocument( mappingUpdate); } + public static CheckedFunction nestedParsedDocFactory() throws Exception { + final MapperService mapperService = createMapperService("type"); + final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject() + .endObject().endObject()); + final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping)); + return docId -> { + final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); + final int nestedValues = between(0, 3); + if (nestedValues > 0) { + XContentBuilder nestedField = source.startObject("nested_field"); + for (int i = 0; i < nestedValues; i++) { + nestedField.field("field-" + i, "value-" + i); + } + source.endObject(); + } + source.endObject(); + return nestedMapper.parse(SourceToParse.source("test", "type", docId, BytesReference.bytes(source), XContentType.JSON)); + }; + } + /** * Creates a tombstone document that only includes uid, seq#, term and version fields. */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 2ff893128d665..8c550ef46a22a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,11 +5,23 @@ */ package org.elasticsearch.xpack.ccr.index.engine; -import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; @@ -131,16 +143,33 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { refreshIfNeeded("lookup_primary_term", seqNo); - try (Searcher searcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) { + try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) { // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already. if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { return OptionalLong.empty(); } else { - final long term = VersionsAndSeqNoResolver.lookupPrimaryTerm(searcher.getDirectoryReader(), seqNo); - assert term > 0L : "seq_no=" + seqNo + " term=" + term; - return OptionalLong.of(term); + final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER) + // excludes the non-root nested documents which don't have primary_term. + .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER) + .build(); + final TopDocs topDocs = searcher.search(query, 1); + if (topDocs.scoreDocs.length == 1) { + final int docId = topDocs.scoreDocs[0].doc; + final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves())); + final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) { + assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]"; + return OptionalLong.of(primaryTermDV.longValue()); + } + } + assert false : "seq_no[" + seqNo + "] does not have primary_term"; + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); } } catch (IOException e) { try { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 6d63d06d4993c..32d1c0e91b5e7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -561,10 +562,12 @@ public void testProcessOnceOnPrimary() throws Exception { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + final CheckedFunction nestedDocFactory = EngineTestCase.nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { - ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + String docId = Integer.toString(between(1, 100)); + ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); From ab21a58fc1ca1aff3b164193e6e691f9fa1fb4e1 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 18 Oct 2018 05:55:33 -0400 Subject: [PATCH 9/9] =?UTF-8?q?boaz=E2=80=99s=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bulk/TransportBulkShardOperationsAction.java | 6 ++++-- .../AlreadyProcessedFollowingEngineException.java | 4 ++++ .../xpack/ccr/index/engine/FollowingEngine.java | 4 ++-- .../xpack/ccr/index/engine/FollowingEngineTests.java | 11 +++++++++-- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 15f8c6cf97f5a..4a4b4648776b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -137,8 +137,10 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); if (failure.getExistingPrimaryTerm().isPresent()) { appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); - } else { - assert targetOp.seqNo() <= primary.getGlobalCheckpoint() : targetOp.seqNo() + " > " + primary.getGlobalCheckpoint(); + } else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) { + assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint(); + throw new IllegalStateException("can't find primary_term for existing op=" + targetOp + + " global_checkpoint=" + primary.getGlobalCheckpoint(), failure); } } else { assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]"; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java index d19bd4b63628e..3033ba31c8253 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java @@ -11,6 +11,10 @@ import java.util.OptionalLong; +/** + * An exception represents that an operation was processed before on the {@link FollowingEngine} of the primary of a follower. + * The field {@code existingPrimaryTerm} is empty only if the operation is below the global checkpoint; otherwise it should be non-empty. + */ public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException { private final long seqNo; private final OptionalLong existingPrimaryTerm; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8c550ef46a22a..84aa141c80d1d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -168,8 +168,8 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { return OptionalLong.of(primaryTermDV.longValue()); } } - assert false : "seq_no[" + seqNo + "] does not have primary_term"; - throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term"); + assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]"; + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")"); } } catch (IOException e) { try { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 32d1c0e91b5e7..5d27c786ad478 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -70,6 +70,7 @@ public class FollowingEngineTests extends ESTestCase { private Index index; private ShardId shardId; private AtomicLong primaryTerm = new AtomicLong(); + private AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); public void setUp() throws Exception { super.setUp(); @@ -263,7 +264,7 @@ public void onFailedEngine(String reason, Exception e) { Collections.emptyList(), null, new NoneCircuitBreakerService(), - () -> SequenceNumbers.NO_OPS_PERFORMED, + globalCheckpoint::longValue, () -> primaryTerm.get(), EngineTestCase.tombstoneDocSupplier() ); @@ -594,13 +595,19 @@ public void testProcessOnceOnPrimary() throws Exception { } } // Primary should reject duplicates + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), followingEngine.getLocalCheckpoint())); final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); for (Engine.Operation op : operations) { Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); - assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); + if (op.seqNo() <= globalCheckpoint.get()) { + assertThat("should not look-up term for operations at most the global checkpoint", + failure.getExistingPrimaryTerm().isPresent(), equalTo(false)); + } else { + assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); + } } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo())));