From be60aa632628ea00de388b159f9646b214d00fab Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Sep 2018 16:51:11 -0400 Subject: [PATCH 01/10] CCR: Optimize indexing ops using seqno on follower This change introduces the indexing optimization using sequence numbers on the FollowingEngine. This optimization uses the max_seq_no_updates which is tracked on the primary of the leader, and replicated to replicas and followers. --- .../index/engine/InternalEngine.java | 21 +- .../index/engine/LiveVersionMap.java | 2 +- .../index/engine/EngineTestCase.java | 10 +- .../ccr/index/engine/FollowingEngine.java | 50 +++- .../xpack/ccr/ShardChangesIT.java | 28 +- .../ShardFollowTaskReplicationTests.java | 4 + .../index/engine/FollowingEngineTests.java | 280 ++++++++++++++---- 7 files changed, 330 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5bd1a473fb97b..e9f2fed276ee0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -875,7 +875,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. */ assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); + plan = IndexingStrategy.optimizedAppendOnly(index.seqNo(), 1L); } else { if (appendOnlyRequest == false) { maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); @@ -927,7 +927,7 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); versionMap.enforceSafeAccess(); } else { - plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); + plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index), 1L); } } else { versionMap.enforceSafeAccess(); @@ -1082,8 +1082,8 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda Optional.of(earlyResultOnPreFlightError); } - static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null); + public static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null); } static IndexingStrategy skipDueToVersionConflict( @@ -1104,7 +1104,8 @@ static IndexingStrategy overrideExistingAsIfNotThere( return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null); } - static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { + public static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, + long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null); } @@ -2331,6 +2332,16 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { localCheckpointTracker.waitForOpsToComplete(seqNo); } + /** + * Checks if the given operation has been processed in this engine or not. + * @return true if the given operation was processed; otherwise false. + */ + protected boolean containsOperation(Operation op) { + assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; + assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); + return localCheckpointTracker.contains(op.seqNo()); + } + @Override public SeqNoStats getSeqNoStats(long globalCheckpoint) { return localCheckpointTracker.getStats(globalCheckpoint); diff --git a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index d0dd9466b6075..6d6340dd337af 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -470,7 +470,7 @@ Releasable acquireLock(BytesRef uid) { return keyedLock.acquire(uid); } - private boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) { + boolean assertKeyedLockHeldByCurrentThread(BytesRef uid) { assert keyedLock.isHeldByCurrentThread(uid) : "Thread [" + Thread.currentThread().getName() + "], uid [" + uid.utf8ToString() + "]"; return true; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 1bbfb6fa73de3..12f0d645d8a87 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -575,11 +575,11 @@ protected static BytesArray bytesArray(String string) { return new BytesArray(string.getBytes(Charset.defaultCharset())); } - protected static Term newUid(String id) { + public static Term newUid(String id) { return new Term("_id", Uid.encodeId(id)); } - protected Term newUid(ParsedDocument doc) { + public static Term newUid(ParsedDocument doc) { return newUid(doc.id()); } @@ -643,7 +643,7 @@ public static List generateSingleDocHistory(boolean forReplica throw new UnsupportedOperationException("unknown version type: " + versionType); } if (randomBoolean()) { - op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null), + op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), SOURCE, null), forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, @@ -734,7 +734,7 @@ public static void assertOpsOnReplica( } } - protected void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { + public static void concurrentlyApplyOps(List ops, InternalEngine engine) throws InterruptedException { Thread[] thread = new Thread[randomIntBetween(3, 5)]; CountDownLatch startGun = new CountDownLatch(thread.length); AtomicInteger offset = new AtomicInteger(-1); @@ -877,7 +877,7 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } - protected MapperService createMapperService(String type) throws IOException { + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 24ada3755cb2a..b57c5d802e3b4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; @@ -18,6 +19,8 @@ */ public final class FollowingEngine extends InternalEngine { + private final CounterMetric numOfOptimizedIndexing = new CounterMetric(); + /** * Construct a new following engine with the specified engine configuration. * @@ -51,7 +54,45 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - return planIndexingAsNonPrimary(index); + /* + * A NOTE ABOUT OPTIMIZATION USING SEQUENCE NUMBERS: + * + * 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed + * one by one under the docID lock. + * + * 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is + * optimized and delivered multiple, it will be appended into Lucene more than once. We void this issue by never + * optimizing an operation if it was processed in the engine (using LocalCheckpointTracker). + * + * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the + * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the + * MUS on the leader when it was executed. + * + * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU(O) <= LCP < seqno(O): + * + * 4.1) If such operation O' with docID(O’) = docID(O), and LCP < seqno(O’), then MSU(O) >= MSU(O') because O' was + * delivered to the follower before O. MUS(0') on the leader is at least seqno(O) or seqno(0') and both > LCP. + * This contradicts the assumption [MSU(O) <= LCP]. + * + * 4.2) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist + * after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every + * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.1]. + * These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O). + * Thus docID(O) does not exist on the follower. + */ + final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); + assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; + if (containsOperation(index)) { + return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); + + } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) { + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; + numOfOptimizedIndexing.inc(); + return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.seqNo(), index.version()); + + } else { + return planIndexingAsNonPrimary(index); + } } @Override @@ -85,4 +126,11 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { return true; } + /** + * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. + * This metric is not persisted, and started from 0 when the engine is opened. + */ + public long getNumberOfOptimizedIndexing() { + return numOfOptimizedIndexing.count(); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index c491b0231beb5..712db5357999f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -49,6 +50,7 @@ import org.elasticsearch.test.discovery.TestZenDiscovery; import org.elasticsearch.xpack.ccr.action.ShardChangesAction; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; @@ -202,7 +204,7 @@ public void testFollowIndex() throws Exception { for (int i = 0; i < firstBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } - + assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs); unfollowIndex("index2"); client().execute(FollowIndexAction.INSTANCE, followRequest).get(); final int secondBatchNumDocs = randomIntBetween(2, 64); @@ -226,6 +228,7 @@ public void testFollowIndex() throws Exception { for (int i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) { assertBusy(assertExpectedDocumentRunnable(i)); } + assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfPrimaryShards, firstBatchNumDocs + secondBatchNumDocs); unfollowIndex("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfPrimaryShards); } @@ -342,6 +345,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertThat(bulkProcessor.awaitClose(1L, TimeUnit.MINUTES), is(true)); assertSameDocCount("index1", "index2"); + assertTotalNumberOfOptimizedIndexing(resolveIndex("index2"), numberOfShards, + client().prepareSearch("index2").get().getHits().totalHits); unfollowIndex("index2"); assertMaxSeqNoOfUpdatesIsTransferred(resolveIndex("index1"), resolveIndex("index2"), numberOfShards); } @@ -766,6 +771,27 @@ private void assertMaxSeqNoOfUpdatesIsTransferred(Index leaderIndex, Index follo }); } + private void assertTotalNumberOfOptimizedIndexing(Index followerIndex, int numberOfShards, long expectedTotal) throws Exception { + assertBusy(() -> { + long[] numOfOptimizedOps = new long[numberOfShards]; + for (int shardId = 0; shardId < numberOfShards; shardId++) { + for (String node : internalCluster().nodesInclude(followerIndex.getName())) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexShard shard = indicesService.getShardOrNull(new ShardId(followerIndex, shardId)); + if (shard != null) { + try { + FollowingEngine engine = ((FollowingEngine) IndexShardTestCase.getEngine(shard)); + numOfOptimizedOps[shardId] = engine.getNumberOfOptimizedIndexing(); + } catch (AlreadyClosedException e) { + throw new AssertionError(e); // causes assertBusy to retry + } + } + } + } + assertThat(Arrays.stream(numOfOptimizedOps).sum(), equalTo(expectedTotal)); + }); + } + public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) { FollowIndexAction.Request request = new FollowIndexAction.Request(); request.setLeaderIndex(leaderIndex); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 2009d74f7c707..0011a091dbd3f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; +import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; @@ -72,6 +73,9 @@ public void testSimpleCcrReplication() throws Exception { assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); followerGroup.assertAllEqual(indexedDocIds.size()); }); + for (IndexShard shard : followerGroup) { + assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + } // Deletes should be replicated to the follower List deleteDocIds = randomSubsetOf(indexedDocIds); for (String deleteId : deleteDocIds) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index c4a929969d240..8f6cec1ea2806 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -6,8 +6,6 @@ package org.elasticsearch.xpack.ccr.index.engine; import org.apache.logging.log4j.Logger; -import org.apache.lucene.document.Field; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; @@ -16,13 +14,11 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -30,11 +26,10 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.TranslogHandler; -import org.elasticsearch.index.mapper.IdFieldMapper; -import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -48,12 +43,19 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasToString; public class FollowingEngineTests extends ESTestCase { @@ -149,7 +151,7 @@ public void runIndexTest( try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { - final Engine.Index index = createIndexOp("id", seqNo, origin); + final Engine.Index index = indexForFollowing("id", seqNo, origin); consumer.accept(followingEngine, index); } } @@ -213,7 +215,7 @@ public void testDoNotFillSeqNoGaps() throws Exception { try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { - followingEngine.index(createIndexOp("id", 128, Engine.Operation.Origin.PRIMARY)); + followingEngine.index(indexForFollowing("id", 128, Engine.Operation.Origin.PRIMARY)); int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get()); assertThat(addedNoops, equalTo(0)); } @@ -278,49 +280,223 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO return followingEngine; } - private Engine.Index createIndexOp(String id, long seqNo, Engine.Operation.Origin origin) { - final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); - final String type = "type"; - final Field versionField = new NumericDocValuesField("_version", 0); - final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - final ParseContext.Document document = new ParseContext.Document(); - document.add(uidField); - document.add(versionField); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - final BytesReference source = new BytesArray(new byte[]{1}); - final ParsedDocument parsedDocument = new ParsedDocument( - versionField, - seqID, - id, - type, - "routing", - Collections.singletonList(document), - source, - XContentType.JSON, - null); - - final long version; - final long autoGeneratedIdTimestamp; - if (randomBoolean()) { - version = 1; - autoGeneratedIdTimestamp = System.currentTimeMillis(); - } else { - version = randomNonNegativeLong(); - autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin) { + final long version = randomBoolean() ? 1 : randomNonNegativeLong(); + final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null); + return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version, + VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean()); + } + + private Engine.Index indexForPrimary(String id) { + final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null); + return new Engine.Index(EngineTestCase.newUid(parsedDoc), primaryTerm.get(), parsedDoc); + } + + private Engine.Delete deleteForPrimary(String id) { + final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null); + return new Engine.Delete(parsedDoc.type(), parsedDoc.id(), EngineTestCase.newUid(parsedDoc), primaryTerm.get()); + } + + public void testBasicOptimization() throws Exception { + runFollowTest((leader, follower) -> { + long numDocs = between(1, 100); + for (int i = 0; i < numDocs; i++) { + leader.index(indexForPrimary(Integer.toString(i))); + } + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + + // Do not apply optimization for deletes or updates + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + leader.index(indexForPrimary(Integer.toString(i))); + } else if (randomBoolean()) { + leader.delete(deleteForPrimary(Integer.toString(i))); + } + } + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + // Apply optimization for documents that do not exist + long moreDocs = between(1, 100); + Set docIds = getDocIds(follower, true).stream().map(doc -> doc.getId()).collect(Collectors.toSet()); + for (int i = 0; i < moreDocs; i++) { + String docId = randomValueOtherThanMany(docIds::contains, () -> Integer.toString(between(1, 1000))); + docIds.add(docId); + leader.index(indexForPrimary(docId)); + } + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + }); + } + + public void testOptimizeAppendOnly() throws Exception { + int numOps = scaledRandomIntBetween(1, 1000); + List ops = new ArrayList<>(); + for (int i = 0; i < numOps; i++) { + ops.add(indexForPrimary(Integer.toString(i))); + } + runFollowTest((leader, follower) -> { + EngineTestCase.concurrentlyApplyOps(ops, leader); + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); + }); + } + + public void testOptimizeMultipleVersions() throws Exception { + List ops = new ArrayList<>(); + for (int numOps = scaledRandomIntBetween(1, 1000), i = 0; i < numOps; i++) { + String id = Integer.toString(between(0, 100)); + if (randomBoolean()) { + ops.add(indexForPrimary(id)); + } else { + ops.add(deleteForPrimary(id)); + } + } + Randomness.shuffle(ops); + runFollowTest((leader, follower) -> { + EngineTestCase.concurrentlyApplyOps(ops, leader); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + final List appendOps = new ArrayList<>(); + for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { + appendOps.add(indexForPrimary("append-" + i)); + } + EngineTestCase.concurrentlyApplyOps(appendOps, leader); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); + }); + } + + public void testOptimizeSingleDocSequentially() throws Exception { + runFollowTest((leader, follower) -> { + leader.index(indexForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + + leader.delete(deleteForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + + leader.index(indexForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + + leader.index(indexForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + }); + } + + public void testOptimizeSingleDocConcurrently() throws Exception { + List ops = EngineTestCase.generateSingleDocHistory(false, randomFrom(VersionType.values()), 2, 10, 500, "id"); + Randomness.shuffle(ops); + runFollowTest((leader, follower) -> { + EngineTestCase.concurrentlyApplyOps(ops, leader); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + long numOptimized = follower.getNumberOfOptimizedIndexing(); + + leader.delete(deleteForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); + + leader.index(indexForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + + leader.index(indexForPrimary("id")); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + }); + } + + private void runFollowTest(CheckedBiConsumer task) throws Exception { + final CheckedBiConsumer wrappedTask = (leader, follower) -> { + Thread[] threads = new Thread[between(1, 8)]; + AtomicBoolean taskIsCompleted = new AtomicBoolean(); + AtomicLong lastFetchedSeqNo = new AtomicLong(follower.getLocalCheckpoint()); + CountDownLatch latch = new CountDownLatch(threads.length + 1); + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(() -> { + try { + latch.countDown(); + latch.await(); + fetchOperations(taskIsCompleted, lastFetchedSeqNo, leader, follower); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + try { + latch.countDown(); + latch.await(); + task.accept(leader, follower); + follower.waitForOpsToComplete(leader.getLocalCheckpoint()); + } finally { + taskIsCompleted.set(true); + for (Thread thread : threads) { + thread.join(); + } + assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); + assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); + } + }; + + Settings leaderSettings = Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT).put("index.soft_deletes.enabled", true).build(); + IndexMetaData leaderIndexMetaData = IndexMetaData.builder(index.getName()).settings(leaderSettings).build(); + IndexSettings leaderIndexSettings = new IndexSettings(leaderIndexMetaData, leaderSettings); + try (Store leaderStore = createStore(shardId, leaderIndexSettings, newDirectory())) { + leaderStore.createEmpty(); + EngineConfig leaderConfig = engineConfig(shardId, leaderIndexSettings, threadPool, leaderStore, logger, xContentRegistry()); + leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog( + leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L)); + try (InternalEngine leaderEngine = new InternalEngine(leaderConfig)) { + leaderEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); + leaderEngine.skipTranslogRecovery(); + Settings followerSettings = Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build(); + IndexMetaData followerIndexMetaData = IndexMetaData.builder(index.getName()).settings(followerSettings).build(); + IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetaData, leaderSettings); + try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { + EngineConfig followerConfig = engineConfig( + shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry()); + try (FollowingEngine followingEngine = createEngine(followerStore, followerConfig)) { + wrappedTask.accept(leaderEngine, followingEngine); + } + } + } } - return new Engine.Index( - new Term("_id", parsedDocument.id()), - parsedDocument, - seqNo, - primaryTerm.get(), - version, - VersionType.EXTERNAL, - origin, - System.currentTimeMillis(), - autoGeneratedIdTimestamp, - randomBoolean()); } + private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo, + InternalEngine leader, FollowingEngine follower) throws IOException { + final MapperService mapperService = EngineTestCase.createMapperService("test"); + final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), follower.config().getIndexSettings()); + while (stopped.get() == false) { + final long checkpoint = leader.getLocalCheckpoint(); + final long lastSeqNo = lastFetchedSeqNo.get(); + if (lastSeqNo < checkpoint) { + final long nextSeqNo = randomLongBetween(lastSeqNo + 1, checkpoint); + if (lastFetchedSeqNo.compareAndSet(lastSeqNo, nextSeqNo)) { + // extends the fetch range so we may deliver some overlapping operations more than once. + final long fromSeqNo = randomLongBetween(Math.max(lastSeqNo - 5, 0), lastSeqNo + 1); + final long toSeqNo = randomLongBetween(nextSeqNo, Math.min(nextSeqNo + 5, checkpoint)); + try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) { + follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes()); + translogHandler.run(follower, snapshot); + } + } + } + } + } } From 7d25e69b67dd605582942a15deb1f36a91db7b53 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 26 Sep 2018 20:29:57 -0400 Subject: [PATCH 02/10] wording --- .../xpack/ccr/index/engine/FollowingEngine.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index b57c5d802e3b4..082df760d8b21 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -55,14 +55,14 @@ private void preFlight(final Operation operation) { protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); /* - * A NOTE ABOUT OPTIMIZATION USING SEQUENCE NUMBERS: + * A note about optimization using sequence numbers: * * 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed * one by one under the docID lock. * * 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is - * optimized and delivered multiple, it will be appended into Lucene more than once. We void this issue by never - * optimizing an operation if it was processed in the engine (using LocalCheckpointTracker). + * optimized and delivered multiple times, it will be appended into Lucene more than once. We void this issue by + * never optimizing an operation if it was processed in the engine (using LocalCheckpointTracker). * * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the From 6cd688e232dc6069f9a9af4e74b2dc4f84b31bae Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 27 Sep 2018 21:46:39 -0400 Subject: [PATCH 03/10] feedback --- .../index/engine/InternalEngine.java | 2 +- .../ccr/index/engine/FollowingEngine.java | 23 +++++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f6ca8ef349fcc..187b0eb1359a1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2336,7 +2336,7 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { * Checks if the given operation has been processed in this engine or not. * @return true if the given operation was processed; otherwise false. */ - protected boolean containsOperation(Operation op) { + protected final boolean hasBeenProcessedBefore(Operation op) { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); return localCheckpointTracker.contains(op.seqNo()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 082df760d8b21..195db1c4c1aee 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -62,27 +62,32 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind * * 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is * optimized and delivered multiple times, it will be appended into Lucene more than once. We void this issue by - * never optimizing an operation if it was processed in the engine (using LocalCheckpointTracker). + * not executing operations which have been processed before (using LocalCheckpointTracker). * * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the - * MUS on the leader when it was executed. + * MUS on the leader when it was executed [MSU_r(O) >= MSU_p(O)]. * - * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU(O) <= LCP < seqno(O): + * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O): * - * 4.1) If such operation O' with docID(O’) = docID(O), and LCP < seqno(O’), then MSU(O) >= MSU(O') because O' was - * delivered to the follower before O. MUS(0') on the leader is at least seqno(O) or seqno(0') and both > LCP. - * This contradicts the assumption [MSU(O) <= LCP]. + * 4.1) If such operation O' with docID(O’) = docID(O), and LCP < seqno(O) < seqno(O’), then MSU_r(O) >= MSU_r(O') because + * O' was delivered to the follower before O. MUS_p(O') is at least seqno(O) because O' on the primary captured + * that O was updated (either by O' or by O" with seqno(O) < seqno(O") < seqno(O')). + * Thus MUS_r(O) >= MSU_r(O') >= MSU_p(O') >= seqno(O) > LCP which contradicts the assumption [MSU_r(O) <= LCP]. * - * 4.2) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist + * 4.2) If such operation O' with docID(O') = docID(O), and LCP < seqno(O') < seqno(O), then MSU_p(O) is at least seqno(O') + * because O on the primary captured that O' was updated (either by O' or by O" with seqno(O') < seqno(O") < seqno(O)). + * Thus MUS_r(O) >= MSU_p(O) >= seqno(O') > LCP which contradicts the assumption [MSU_r(O) <= LCP]. + * + * 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist * after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every - * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.1]. + * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.1 and 4.2]. * These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O). * Thus docID(O) does not exist on the follower. */ final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; - if (containsOperation(index)) { + if (hasBeenProcessedBefore(index)) { return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) { From adb99338907934d863c85fb795cb02609b395cfd Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 01:31:21 -0400 Subject: [PATCH 04/10] wording --- .../xpack/ccr/index/engine/FollowingEngine.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 195db1c4c1aee..3e67cfde229b2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -66,22 +66,20 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind * * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the - * MUS on the leader when it was executed [MSU_r(O) >= MSU_p(O)]. + * MUS on the leader when it was executed [every operation O => MSU_r(O) >= MSU_p(O)]. * * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O): * - * 4.1) If such operation O' with docID(O’) = docID(O), and LCP < seqno(O) < seqno(O’), then MSU_r(O) >= MSU_r(O') because - * O' was delivered to the follower before O. MUS_p(O') is at least seqno(O) because O' on the primary captured - * that O was updated (either by O' or by O" with seqno(O) < seqno(O") < seqno(O')). - * Thus MUS_r(O) >= MSU_r(O') >= MSU_p(O') >= seqno(O) > LCP which contradicts the assumption [MSU_r(O) <= LCP]. + * 4.1) Given two operations O and O' with docID(O’) = docID(O) and seqno(O) < seqno(O’) then MSU_p(O') on the primary + * must be at least seqno(O). Moreover, the MSU_r on a follower >= min(seqno(O), seqno(O')) after these operations + * arrive in any order. * - * 4.2) If such operation O' with docID(O') = docID(O), and LCP < seqno(O') < seqno(O), then MSU_p(O) is at least seqno(O') - * because O on the primary captured that O' was updated (either by O' or by O" with seqno(O') < seqno(O") < seqno(O)). - * Thus MUS_r(O) >= MSU_p(O) >= seqno(O') > LCP which contradicts the assumption [MSU_r(O) <= LCP]. + * 4.2) If such operation O' with docID(O’) = docID(O) and LCP < seqno(O’) then MSU_r(O) >= min(seqno(O), seqno(O')) > LCP + * because both arrived on the follower[4.1]. This contradicts the assumption [MSU_r(O) <= LCP]. * * 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist * after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every - * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.1 and 4.2]. + * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.2]. * These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O). * Thus docID(O) does not exist on the follower. */ From be86b4f0a7a283cf56bfd041812aabba47fdce46 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 09:20:54 -0400 Subject: [PATCH 05/10] shuffle snapshot --- .../index/engine/FollowingEngineTests.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 8f6cec1ea2806..ce67cfe2d4484 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -45,6 +45,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -491,7 +492,8 @@ private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo, // extends the fetch range so we may deliver some overlapping operations more than once. final long fromSeqNo = randomLongBetween(Math.max(lastSeqNo - 5, 0), lastSeqNo + 1); final long toSeqNo = randomLongBetween(nextSeqNo, Math.min(nextSeqNo + 5, checkpoint)); - try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = + shuffleSnapshot(leader.newChangesSnapshot("test", mapperService, fromSeqNo, toSeqNo, true))) { follower.advanceMaxSeqNoOfUpdatesOrDeletes(leader.getMaxSeqNoOfUpdatesOrDeletes()); translogHandler.run(follower, snapshot); } @@ -499,4 +501,34 @@ private void fetchOperations(AtomicBoolean stopped, AtomicLong lastFetchedSeqNo, } } } + + private Translog.Snapshot shuffleSnapshot(Translog.Snapshot snapshot) throws IOException { + final List operations = new ArrayList<>(snapshot.totalOperations()); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + operations.add(op); + } + Randomness.shuffle(operations); + final Iterator iterator = operations.iterator(); + + return new Translog.Snapshot() { + @Override + public int totalOperations() { + return snapshot.totalOperations(); + } + + @Override + public Translog.Operation next() { + if (iterator.hasNext()) { + return iterator.next(); + } + return null; + } + + @Override + public void close() throws IOException { + snapshot.close(); + } + }; + } } From 91f419b2d9d373509c9d31b623b442a7ad5efe43 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 11:01:06 -0400 Subject: [PATCH 06/10] update note --- .../ccr/index/engine/FollowingEngine.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 3e67cfde229b2..7d9564fbf7bbc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -60,9 +60,8 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind * 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed * one by one under the docID lock. * - * 2. An engine itself can resolve correctly if an operation is delivered multiple times. However, if an operation is - * optimized and delivered multiple times, it will be appended into Lucene more than once. We void this issue by - * not executing operations which have been processed before (using LocalCheckpointTracker). + * 2. Operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates + * in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed. * * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the @@ -70,16 +69,18 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind * * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O): * - * 4.1) Given two operations O and O' with docID(O’) = docID(O) and seqno(O) < seqno(O’) then MSU_p(O') on the primary - * must be at least seqno(O). Moreover, the MSU_r on a follower >= min(seqno(O), seqno(O')) after these operations - * arrive in any order. + * 4.1) If such operation O' on the follower with docID(O’) = docID(O) and LCP < seqno(O) < seqno(O’) then MUS_p(O') >= seqno(O') + * because O' is an update or MUS_p(O') >= seqno(delete O") with docID(O") = docID(O) and seqno(O) < seqno(O") < seqno(O'). + * MSU_r(O) >= MSU_r(O') because O' was delivered to the follower before O, and MSU_p(O') > seqno(O) > LCP in both cases. + * Thus MUS_r(O) >= MSU_r(O') >= MSU_p(O') > seqno(O) > LCP which contradicts the assumption [MSU_r(O) <= LCP] * - * 4.2) If such operation O' with docID(O’) = docID(O) and LCP < seqno(O’) then MSU_r(O) >= min(seqno(O), seqno(O')) > LCP - * because both arrived on the follower[4.1]. This contradicts the assumption [MSU_r(O) <= LCP]. + * 4.2) If such operation O' in the history with docID(O’) = docID(O), and LCP < seqno(O') < seqno(O) then MSU_p(O) >= seqno(O) + * because O is an update or MSU_p(O) >= seqno(delete O") with docID(O") = docID(O) and seqno(O') < seqno(O") < seqno(O). + * Thus MSU_r(O) >= MSU_p(O) > seqno(O') > LCP in both cases which contradicts the assumption [MSU_r(O) <= LCP]. * * 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist * after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every - * operation with seqno <= LCP, and there is no such O' with docID(O’) = docID(O) and LCP < seqno(O’)[4.2]. + * operation with seqno <= LCP, and there is no such O' in the history with docID(O’) = docID(O) and LCP < seqno(O’)[4.2]. * These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O). * Thus docID(O) does not exist on the follower. */ From f34b447be2ed2a11c2606084abc5c27a693cf041 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 15:40:50 -0400 Subject: [PATCH 07/10] use proof from david and yannick --- .../elasticsearch/index/engine/Engine.java | 26 ++++++++++++++++ .../ccr/index/engine/FollowingEngine.java | 31 +------------------ 2 files changed, 27 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index c45517e956728..8a9d7627ef6a8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1834,6 +1834,32 @@ public interface TranslogRecoveryRunner { * Returns the maximum sequence number of either update or delete operations have been processed in this engine * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered * as an update operation if it overwrites the existing documents in Lucene index with the same document id. + *

+ * A note on the optimization using max_seq_no_of_updates_or_deletes: + * For each operation O, the key invariants are: + *

    + *
  1. I1. There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O)
  2. + *
  3. I2. If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O' + * * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete.
  4. + *
+ *

+ * When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU is >= MSU(O), + * and then compares its MSU to its local checkpoint (LCP). If LCP < MSU then there’s a gap: there may be some operations that act + * on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future operation O' + * with seqNo(O') > seqNo(O) and docId(O') = docID(O) is processed before O. In that case MSU(O') is at least seqNo(O') and this + * means MSU >= seqNo(O') > seqNo(O) > LCP (because O wasn't processed yet). + *

+ * However, if MSU <= LCP then there is no gap: we have processed every operation <= LCP, and no operation O' with seqno(O') > LCP + * and seqno(O') < seqno(O) also has docID(O') = docID(O), because such an operation would have seqno(O') > LCP >= MSU >= MSU(O) + * which contradicts the first invariant. Furthermore in this case we immediately know that docID(O) has been deleted + * (or never existed) without needing to check Lucene for the following reason. If there's no earlier operation on docID(O) then + * this is clear, so suppose instead that the preceding operation on docID(O) is O': + * 1. The first invariant above tells us that seqno(O') <= MSU(O) <= LCP so we have already applied O' to Lucene. + * 2. Also MSU(O) <= MSU <= LCP < seqno(O) (we discard O if seqno(O) ≤ LCP) so the second invariant applies, + * meaning that the O' was a delete. + *

+ * Moreover, operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates + * in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed. * * @see #initializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 7d9564fbf7bbc..458461f3c8457 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -54,36 +54,7 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - /* - * A note about optimization using sequence numbers: - * - * 1. Indexing operations are processed concurrently in an engine. However, operations of the same docID are processed - * one by one under the docID lock. - * - * 2. Operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates - * in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed. - * - * 3. When replicating operations to replicas or followers, we also carry the max seq_no_of_updates_or_deletes on the - * leader to followers. This transfer guarantees the MUS on a follower when operation O is processed at least the - * MUS on the leader when it was executed [every operation O => MSU_r(O) >= MSU_p(O)]. - * - * 4. The following proves that docID(O) does not exist on a follower when operation O is applied if MSU_r(O) <= LCP < seqno(O): - * - * 4.1) If such operation O' on the follower with docID(O’) = docID(O) and LCP < seqno(O) < seqno(O’) then MUS_p(O') >= seqno(O') - * because O' is an update or MUS_p(O') >= seqno(delete O") with docID(O") = docID(O) and seqno(O) < seqno(O") < seqno(O'). - * MSU_r(O) >= MSU_r(O') because O' was delivered to the follower before O, and MSU_p(O') > seqno(O) > LCP in both cases. - * Thus MUS_r(O) >= MSU_r(O') >= MSU_p(O') > seqno(O) > LCP which contradicts the assumption [MSU_r(O) <= LCP] - * - * 4.2) If such operation O' in the history with docID(O’) = docID(O), and LCP < seqno(O') < seqno(O) then MSU_p(O) >= seqno(O) - * because O is an update or MSU_p(O) >= seqno(delete O") with docID(O") = docID(O) and seqno(O') < seqno(O") < seqno(O). - * Thus MSU_r(O) >= MSU_p(O) > seqno(O') > LCP in both cases which contradicts the assumption [MSU_r(O) <= LCP]. - * - * 4.3) MSU(O) < seqno(O) then docID(O) does not exist when O is applied on a leader. This means docID(O) does not exist - * after we apply every operation with docID = docID(O) and seqno < seqno(O). On the follower, we have applied every - * operation with seqno <= LCP, and there is no such O' in the history with docID(O’) = docID(O) and LCP < seqno(O’)[4.2]. - * These mean the follower has applied every operation with docID = docID(O) and seqno < seqno(O). - * Thus docID(O) does not exist on the follower. - */ + // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; if (hasBeenProcessedBefore(index)) { From 516c99ec6e9cdc41f00e1606400f0fd57e0e388e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 15:42:23 -0400 Subject: [PATCH 08/10] typo --- .../main/java/org/elasticsearch/index/engine/Engine.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8a9d7627ef6a8..149ad154965b9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1838,9 +1838,9 @@ public interface TranslogRecoveryRunner { * A note on the optimization using max_seq_no_of_updates_or_deletes: * For each operation O, the key invariants are: *

    - *
  1. I1. There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O)
  2. - *
  3. I2. If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O' - * * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete.
  4. + *
  5. I1: There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O)
  6. + *
  7. I2: If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O' + * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete.
  8. *
*

* When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU is >= MSU(O), From 6f807cf714a6242e268bda8305703a9eb85897e0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 15:53:14 -0400 Subject: [PATCH 09/10] add conclusion --- .../src/main/java/org/elasticsearch/index/engine/Engine.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 149ad154965b9..9ba00760376e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1858,8 +1858,9 @@ public interface TranslogRecoveryRunner { * 2. Also MSU(O) <= MSU <= LCP < seqno(O) (we discard O if seqno(O) ≤ LCP) so the second invariant applies, * meaning that the O' was a delete. *

- * Moreover, operations that are optimized using the MSU optimization will not be processed twice as this will create duplicates - * in Lucene. To avoid it we check the local checkpoint tracker to see if an operation was already processed. + * Therefore, if MSU <= LCP < seqno(O) we know that O can safely be optimized with and added to lucene with addDocument. Moreover, + * operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates in Lucene. + * To avoid this we check the local checkpoint tracker to see if an operation was already processed. * * @see #initializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) From d79d26e8fc3186afb03c1dfb0f66e40033ffa8dc Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 28 Sep 2018 17:28:28 -0400 Subject: [PATCH 10/10] stylecheck --- .../elasticsearch/index/engine/Engine.java | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 9ba00760376e8..460501c8b5238 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1838,29 +1838,29 @@ public interface TranslogRecoveryRunner { * A note on the optimization using max_seq_no_of_updates_or_deletes: * For each operation O, the key invariants are: *

    - *
  1. I1: There is no operation on docID(O) with seqno that is > MSU(O) and < seqno(O)
  2. - *
  3. I2: If MSU(O) < seqno(O) then docID(O) did not exist when O was applied; more precisely, if there is any O' - * with seqno(O') < seqno(O) and docID(O') = docID(O) then the one with the greatest seqno is a delete.
  4. + *
  5. I1: There is no operation on docID(O) with seqno that is {@literal > MSU(O) and < seqno(O)}
  6. + *
  7. I2: If {@literal MSU(O) < seqno(O)} then docID(O) did not exist when O was applied; more precisely, if there is any O' + * with {@literal seqno(O') < seqno(O) and docID(O') = docID(O)} then the one with the greatest seqno is a delete.
  8. *
*

- * When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU is >= MSU(O), - * and then compares its MSU to its local checkpoint (LCP). If LCP < MSU then there’s a gap: there may be some operations that act - * on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future operation O' - * with seqNo(O') > seqNo(O) and docId(O') = docID(O) is processed before O. In that case MSU(O') is at least seqNo(O') and this - * means MSU >= seqNo(O') > seqNo(O) > LCP (because O wasn't processed yet). + * When a receiving shard (either a replica or a follower) receives an operation O, it must first ensure its own MSU at least MSU(O), + * and then compares its MSU to its local checkpoint (LCP). If {@literal LCP < MSU} then there's a gap: there may be some operations + * that act on docID(O) about which we do not yet know, so we cannot perform an add. Note this also covers the case where a future + * operation O' with {@literal seqNo(O') > seqNo(O) and docId(O') = docID(O)} is processed before O. In that case MSU(O') is at least + * seqno(O') and this means {@literal MSU >= seqNo(O') > seqNo(O) > LCP} (because O wasn't processed yet). *

- * However, if MSU <= LCP then there is no gap: we have processed every operation <= LCP, and no operation O' with seqno(O') > LCP - * and seqno(O') < seqno(O) also has docID(O') = docID(O), because such an operation would have seqno(O') > LCP >= MSU >= MSU(O) - * which contradicts the first invariant. Furthermore in this case we immediately know that docID(O) has been deleted - * (or never existed) without needing to check Lucene for the following reason. If there's no earlier operation on docID(O) then - * this is clear, so suppose instead that the preceding operation on docID(O) is O': - * 1. The first invariant above tells us that seqno(O') <= MSU(O) <= LCP so we have already applied O' to Lucene. - * 2. Also MSU(O) <= MSU <= LCP < seqno(O) (we discard O if seqno(O) ≤ LCP) so the second invariant applies, - * meaning that the O' was a delete. + * However, if {@literal MSU <= LCP} then there is no gap: we have processed every {@literal operation <= LCP}, and no operation O' + * with {@literal seqno(O') > LCP and seqno(O') < seqno(O) also has docID(O') = docID(O)}, because such an operation would have + * {@literal seqno(O') > LCP >= MSU >= MSU(O)} which contradicts the first invariant. Furthermore in this case we immediately know + * that docID(O) has been deleted (or never existed) without needing to check Lucene for the following reason. If there's no earlier + * operation on docID(O) then this is clear, so suppose instead that the preceding operation on docID(O) is O': + * 1. The first invariant above tells us that {@literal seqno(O') <= MSU(O) <= LCP} so we have already applied O' to Lucene. + * 2. Also {@literal MSU(O) <= MSU <= LCP < seqno(O)} (we discard O if {@literal seqno(O) <= LCP}) so the second invariant applies, + * meaning that the O' was a delete. *

- * Therefore, if MSU <= LCP < seqno(O) we know that O can safely be optimized with and added to lucene with addDocument. Moreover, - * operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates in Lucene. - * To avoid this we check the local checkpoint tracker to see if an operation was already processed. + * Therefore, if {@literal MSU <= LCP < seqno(O)} we know that O can safely be optimized with and added to lucene with addDocument. + * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates + * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed. * * @see #initializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)