diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
index c45517e956728..460501c8b5238 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -1834,6 +1834,33 @@ public interface TranslogRecoveryRunner {
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
+ *
+ * A note on the optimization using max_seq_no_of_updates_or_deletes:
+ * For each operation O, the key invariants are:
+ *
+ * - I1: There is no operation on docID(O) with seqno that is {@literal > MSU(O) and < seqno(O)}
+ * - I2: If {@literal MSU(O) < seqno(O)} then docID(O) did not exist when O was applied; more precisely, if there is any O'
+ * with {@literal seqno(O') < seqno(O) and docID(O') = docID(O)} then the one with the greatest seqno is a delete.
+ *
+ *
+ * When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU at least MSU(O),
+ * and then compares its MSU to its local checkpoint (LCP). If {@literal LCP < MSU} then there's a gap: there may be some operations
+ * that act on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future
+ * operation O' with {@literal seqNo(O') > seqNo(O) and docId(O') = docID(O)} is processed before O. In that case MSU(O') is at least
+ * seqno(O') and this means {@literal MSU >= seqNo(O') > seqNo(O) > LCP} (because O wasn't processed yet).
+ *
+ * However, if {@literal MSU <= LCP} then there is no gap: we have processed every {@literal operation <= LCP}, and no operation O'
+ * with {@literal seqno(O') > LCP and seqno(O') < seqno(O) also has docID(O') = docID(O)}, because such an operation would have
+ * {@literal seqno(O') > LCP >= MSU >= MSU(O)} which contradicts the first invariant. Furthermore in this case we immediately know
+ * that docID(O) has been deleted (or never existed) without needing to check Lucene for the following reason. If there's no earlier
+ * operation on docID(O) then this is clear, so suppose instead that the preceding operation on docID(O) is O':
+ * 1. The first invariant above tells us that {@literal seqno(O') <= MSU(O) <= LCP} so we have already applied O' to Lucene.
+ * 2. Also {@literal MSU(O) <= MSU <= LCP < seqno(O)} (we discard O if {@literal seqno(O) <= LCP}) so the second invariant applies,
+ * meaning that the O' was a delete.
+ *
+ * Therefore, if {@literal MSU <= LCP < seqno(O)} we know that O can safely be optimized with and added to lucene with addDocument.
+ * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates
+ * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 85e7cebcb88fc..187b0eb1359a1 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
*/
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
- plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
+ plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L);
} else {
if (appendOnlyRequest == false) {
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
@@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
versionMap.enforceSafeAccess();
} else {
- plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
+ plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L);
}
} else {
versionMap.enforceSafeAccess();
@@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda
Optional.of(earlyResultOnPreFlightError);
}
- static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) {
- return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null);
+ public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) {
+ return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
}
static IndexingStrategy skipDueToVersionConflict(
@@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere(
return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null);
}
- static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) {
+ public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing,
+ long versionForIndexing) {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null);
}
@@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}
+ /**
+ * Checks if the given operation has been processed in this engine or not.
+ * @return true if the given operation was processed; otherwise false.
+ */
+ protected final boolean hasBeenProcessedBefore(Operation op) {
+ assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
+ assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
+ return localCheckpointTracker.contains(op.seqNo());
+ }
+
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java
index d0dd9466b6075..6d6340dd337af 100644
--- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java
+++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java
@@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) {
return keyedLock.acquire(uid);
}
- private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
+ boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]";
return true;
}
diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
index 1bbfb6fa73de3..12f0d645d8a87 100644
--- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java
@@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) {
return new BytesArray(string.getBytes(Charset.defaultCharset()));
}
- protected static Term newUid(String id) {
+ public static Term newUid(String id) {
return new Term("_id", Uid.encodeId(id));
}
- protected Term newUid(ParsedDocument doc) {
+ public static Term newUid(ParsedDocument doc) {
return newUid(doc.id());
}
@@ -643,7 +643,7 @@ public static List generateSingleDocHistory(boolean forReplica
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
- op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
+ op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
@@ -734,7 +734,7 @@ public static void assertOpsOnReplica(
}
}
- protected void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException {
+ public static void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException {
Thread[] thread = new Thread[randomIntBetween(3, 5)];
CountDownLatch startGun = new CountDownLatch(thread.length);
AtomicInteger offset = new AtomicInteger(-1);
@@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}
- protected MapperService createMapperService(String type) throws IOException {
+ public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java
index 24ada3755cb2a..458461f3c8457 100644
--- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java
+++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java
@@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;
+import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
@@ -18,6 +19,8 @@
*/
public final class FollowingEngine extends InternalEngine {
+ private final CounterMetric numOfOptimizedIndexing = new CounterMetric();
+
/**
* Construct a new following engine with the specified engine configuration.
*
@@ -51,7 +54,20 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
- return planIndexingAsNonPrimary(index);
+ // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
+ final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
+ assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
+ if (hasBeenProcessedBefore(index)) {
+ return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
+
+ } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
+ assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
+ numOfOptimizedIndexing.inc();
+ return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version());
+
+ } else {
+ return planIndexingAsNonPrimary(index);
+ }
}
@Override
@@ -85,4 +101,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
return true;
}
+ /**
+ * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
+ * This metric is not persisted, and started from 0 when the engine is opened.
+ */
+ public long getNumberOfOptimizedIndexing() {
+ return numOfOptimizedIndexing.count();
+ }
}
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java
index a429e92bcbd40..793aca83fee9e 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java
@@ -40,6 +40,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
@@ -52,6 +53,7 @@
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
+import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
@@ -210,7 +212,7 @@ public void testFollowIndex() throws Exception {
for (int i = 0; i < firstBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
-
+ assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs);
unfollowIndex("index2");
client().execute(ResumeFollowAction.INSTANCE, followRequest.getFollowRequest()).get();
final int secondBatchNumDocs = randomIntBetween(2, 64);
@@ -234,6 +236,7 @@ public void testFollowIndex() throws Exception {
for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
assertBusy(assertExpectedDocumentRunnable(i));
}
+ assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards);
}
@@ -347,6 +350,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true));
assertSameDocCount("index1", "index2");
+ assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards,
+ client().prepareSearch("index2").get().getHits().totalHits);
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards);
}
@@ -436,6 +441,7 @@ public void testFollowIndexWithNestedField() throws Exception {
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
+ assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
}
public void testUnfollowNonExistingIndex() {
@@ -473,7 +479,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureYellow("index1");
- final int numDocs = 1024;
+ final int numDocs = between(10, 1024);
logger.info("Indexing [{}] docs", numDocs);
for (int i = 0; i < numDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
@@ -499,6 +505,7 @@ public void testFollowIndexMaxOperationSizeInBytes() throws Exception {
}
unfollowIndex("index2");
assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), 1);
+ assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), 1, numDocs);
}
public void testDontFollowTheWrongIndex() throws Exception {
@@ -871,6 +878,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo
});
}
+ private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception {
+ assertBusy(() -> {
+ long[] numOfOptimizedOps = new long[numberOfShards];
+ for (int shardId = 0; shardId < numberOfShards; shardId++) {
+ for (String node : internalCluster().nodesInclude(followerIndex.getName())) {
+ IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
+ IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId));
+ if (shard != null) {
+ try {
+ FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard));
+ numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing();
+ } catch (AlreadyClosedException e) {
+ throw new AssertionError(e); // causes assertBusy to retry
+ }
+ }
+ }
+ }
+ assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal));
+ });
+ }
+
public static PutFollowAction.Request follow(String leaderIndex, String followerIndex) {
return new PutFollowAction.Request(resumeFollow(leaderIndex, followerIndex));
}
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
index 2009d74f7c707..0011a091dbd3f 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
@@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
+import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.io.IOException;
@@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint()));
followerGroup.assertAllEqual(indexedDocIds.size());
});
+ for (IndexShard shard : followerGroup) {
+ assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount));
+ }
// Deletes should be replicated to the follower
List deleteDocIds = randomSubsetOf(indexedDocIds);
for (String deleteId : deleteDocIds) {
diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
index c4a929969d240..ce67cfe2d4484 100644
--- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
+++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java
@@ -6,8 +6,6 @@
package org.elasticsearch.xpack.ccr.index.engine;
import org.apache.logging.log4j.Logger;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
@@ -16,13 +14,11 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
@@ -30,11 +26,10 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
+import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.TranslogHandler;
-import org.elasticsearch.index.mapper.IdFieldMapper;
-import org.elasticsearch.index.mapper.ParseContext;
+import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
-import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@@ -48,12 +43,20 @@
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasToString;
public class FollowingEngineTests extends ESTestCase {
@@ -149,7 +152,7 @@ public void runIndexTest(
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
- final Engine.Index index = createIndexOp("id", seqNo, origin);
+ final Engine.Index index = indexForFollowing("id", seqNo, origin);
consumer.accept(followingEngine, index);
}
}
@@ -213,7 +216,7 @@ public void testDoNotFillSeqNoGaps() throws Exception {
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
- followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY));
+ followingEngine.index(indexForFollowing("id", 128, Engine.Operation.Origin.PRIMARY));
int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get());
assertThat(addedNoops, equalTo(0));
}
@@ -278,49 +281,254 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO
return followingEngine;
}
- private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) {
- final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
- final String type = "type";
- final Field versionField = new NumericDocValuesField("_version", 0);
- final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
- final ParseContext.Document document = new ParseContext.Document();
- document.add(uidField);
- document.add(versionField);
- document.add(seqID.seqNo);
- document.add(seqID.seqNoDocValue);
- document.add(seqID.primaryTerm);
- final BytesReference source = new BytesArray(new byte[]{1});
- final ParsedDocument parsedDocument = new ParsedDocument(
- versionField,
- seqID,
- id,
- type,
- "routing",
- Collections.singletonList(document),
- source,
- XContentType.JSON,
- null);
-
- final long version;
- final long autoGeneratedIdTimestamp;
- if (randomBoolean()) {
- version = 1;
- autoGeneratedIdTimestamp = System.currentTimeMillis();
- } else {
- version = randomNonNegativeLong();
- autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
+ private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin) {
+ final long version = randomBoolean() ? 1 : randomNonNegativeLong();
+ final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null);
+ return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version,
+ VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean());
+ }
+
+ private Engine.Index indexForPrimary(String id) {
+ final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null);
+ return new Engine.Index(EngineTestCase.newUid(parsedDoc), primaryTerm.get(), parsedDoc);
+ }
+
+ private Engine.Delete deleteForPrimary(String id) {
+ final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null);
+ return new Engine.Delete(parsedDoc.type(), parsedDoc.id(), EngineTestCase.newUid(parsedDoc), primaryTerm.get());
+ }
+
+ public void testBasicOptimization() throws Exception {
+ runFollowTest((leader, follower) -> {
+ long numDocs = between(1, 100);
+ for (int i = 0; i < numDocs; i++) {
+ leader.index(indexForPrimary(Integer.toString(i)));
+ }
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L));
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs));
+ assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
+
+ // Do not apply optimization for deletes or updates
+ for (int i = 0; i < numDocs; i++) {
+ if (randomBoolean()) {
+ leader.index(indexForPrimary(Integer.toString(i)));
+ } else if (randomBoolean()) {
+ leader.delete(deleteForPrimary(Integer.toString(i)));
+ }
+ }
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs));
+ assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
+ // Apply optimization for documents that do not exist
+ long moreDocs = between(1, 100);
+ Set docIds = getDocIds(follower, true).stream().map(doc -> doc.getId()).collect(Collectors.toSet());
+ for (int i = 0; i < moreDocs; i++) {
+ String docId = randomValueOtherThanMany(docIds::contains, () -> Integer.toString(between(1, 1000)));
+ docIds.add(docId);
+ leader.index(indexForPrimary(docId));
+ }
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs));
+ assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
+ });
+ }
+
+ public void testOptimizeAppendOnly() throws Exception {
+ int numOps = scaledRandomIntBetween(1, 1000);
+ List ops = new ArrayList<>();
+ for (int i = 0; i < numOps; i++) {
+ ops.add(indexForPrimary(Integer.toString(i)));
}
- return new Engine.Index(
- new Term("_id", parsedDocument.id()),
- parsedDocument,
- seqNo,
- primaryTerm.get(),
- version,
- VersionType.EXTERNAL,
- origin,
- System.currentTimeMillis(),
- autoGeneratedIdTimestamp,
- randomBoolean());
+ runFollowTest((leader, follower) -> {
+ EngineTestCase.concurrentlyApplyOps(ops, leader);
+ assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps));
+ });
}
+ public void testOptimizeMultipleVersions() throws Exception {
+ List ops = new ArrayList<>();
+ for (int numOps = scaledRandomIntBetween(1, 1000), i = 0; i < numOps; i++) {
+ String id = Integer.toString(between(0, 100));
+ if (randomBoolean()) {
+ ops.add(indexForPrimary(id));
+ } else {
+ ops.add(deleteForPrimary(id));
+ }
+ }
+ Randomness.shuffle(ops);
+ runFollowTest((leader, follower) -> {
+ EngineTestCase.concurrentlyApplyOps(ops, leader);
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ final List appendOps = new ArrayList<>();
+ for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) {
+ appendOps.add(indexForPrimary("append-" + i));
+ }
+ EngineTestCase.concurrentlyApplyOps(appendOps, leader);
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size()));
+ });
+ }
+
+ public void testOptimizeSingleDocSequentially() throws Exception {
+ runFollowTest((leader, follower) -> {
+ leader.index(indexForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L));
+
+ leader.delete(deleteForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L));
+
+ leader.index(indexForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L));
+
+ leader.index(indexForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L));
+ });
+ }
+
+ public void testOptimizeSingleDocConcurrently() throws Exception {
+ List ops = EngineTestCase.generateSingleDocHistory(false, randomFrom(VersionType.values()), 2, 10, 500, "id");
+ Randomness.shuffle(ops);
+ runFollowTest((leader, follower) -> {
+ EngineTestCase.concurrentlyApplyOps(ops, leader);
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
+ long numOptimized = follower.getNumberOfOptimizedIndexing();
+
+ leader.delete(deleteForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized));
+
+ leader.index(indexForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L));
+
+ leader.index(indexForPrimary("id"));
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L));
+ });
+ }
+
+ private void runFollowTest(CheckedBiConsumer task) throws Exception {
+ final CheckedBiConsumer wrappedTask = (leader, follower) -> {
+ Thread[] threads = new Thread[between(1, 8)];
+ AtomicBoolean taskIsCompleted = new AtomicBoolean();
+ AtomicLong lastFetchedSeqNo = new AtomicLong(follower.getLocalCheckpoint());
+ CountDownLatch latch = new CountDownLatch(threads.length + 1);
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(() -> {
+ try {
+ latch.countDown();
+ latch.await();
+ fetchOperations(taskIsCompleted, lastFetchedSeqNo, leader, follower);
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ });
+ threads[i].start();
+ }
+ try {
+ latch.countDown();
+ latch.await();
+ task.accept(leader, follower);
+ follower.waitForOpsToComplete(leader.getLocalCheckpoint());
+ } finally {
+ taskIsCompleted.set(true);
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes()));
+ assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
+ }
+ };
+
+ Settings leaderSettings = Settings.builder()
+ .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
+ .put("index.version.created", Version.CURRENT).put("index.soft_deletes.enabled", true).build();
+ IndexMetaData leaderIndexMetaData = IndexMetaData.builder(index.getName()).settings(leaderSettings).build();
+ IndexSettings leaderIndexSettings = new IndexSettings(leaderIndexMetaData, leaderSettings);
+ try (Store leaderStore = createStore(shardId, leaderIndexSettings, newDirectory())) {
+ leaderStore.createEmpty();
+ EngineConfig leaderConfig = engineConfig(shardId, leaderIndexSettings, threadPool, leaderStore, logger, xContentRegistry());
+ leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog(
+ leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L));
+ try (InternalEngine leaderEngine = new InternalEngine(leaderConfig)) {
+ leaderEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
+ leaderEngine.skipTranslogRecovery();
+ Settings followerSettings = Settings.builder()
+ .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
+ .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build();
+ IndexMetaData followerIndexMetaData = IndexMetaData.builder(index.getName()).settings(followerSettings).build();
+ IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetaData, leaderSettings);
+ try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) {
+ EngineConfig followerConfig = engineConfig(
+ shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry());
+ try (FollowingEngine followingEngine = createEngine(followerStore, followerConfig)) {
+ wrappedTask.accept(leaderEngine, followingEngine);
+ }
+ }
+ }
+ }
+ }
+
+ private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo,
+ InternalEngine leader, FollowingEngine follower) throws IOException {
+ final MapperService mapperService = EngineTestCase.createMapperService("test");
+ final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), follower.config().getIndexSettings());
+ while (stopped.get() == false) {
+ final long checkpoint = leader.getLocalCheckpoint();
+ final long lastSeqNo = lastFetchedSeqNo.get();
+ if (lastSeqNo < checkpoint) {
+ final long nextSeqNo = randomLongBetween(lastSeqNo + 1, checkpoint);
+ if (lastFetchedSeqNo.compareAndSet(lastSeqNo, nextSeqNo)) {
+ // extends the fetch range so we may deliver some overlapping operations more than once.
+ final long fromSeqNo = randomLongBetween(Math.max(lastSeqNo - 5, 0), lastSeqNo + 1);
+ final long toSeqNo = randomLongBetween(nextSeqNo, Math.min(nextSeqNo + 5, checkpoint));
+ try (Translog.Snapshot snapshot =
+ shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) {
+ follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes());
+ translogHandler.run(follower, snapshot);
+ }
+ }
+ }
+ }
+ }
+
+ private Translog.Snapshot shuffleSnapshot(Translog.Snapshot snapshot) throws IOException {
+ final List operations = new ArrayList<>(snapshot.totalOperations());
+ Translog.Operation op;
+ while ((op = snapshot.next()) != null) {
+ operations.add(op);
+ }
+ Randomness.shuffle(operations);
+ final Iterator iterator = operations.iterator();
+
+ return new Translog.Snapshot() {
+ @Override
+ public int totalOperations() {
+ return snapshot.totalOperations();
+ }
+
+ @Override
+ public Translog.Operation next() {
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ snapshot.close();
+ }
+ };
+ }
}