From 688cf832fe14400ec01fa1fe5d3319e24a198f73 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 5 Jul 2019 18:55:24 -0400 Subject: [PATCH] Enable indexing optimization using sequence numbers on replicas (#43616) This PR enables the indexing optimization using sequence numbers on replicas. With this optimization, indexing on replicas should be faster and use less memory as it can forgo the version lookup when possible. This change also deactivates the append-only optimization on replicas. Relates #34099 --- .../index/engine/InternalEngine.java | 89 +++++-------- .../index/engine/InternalEngineTests.java | 126 +++--------------- .../engine/LuceneChangesSnapshotTests.java | 12 +- .../IndexLevelReplicationTests.java | 34 +++++ .../index/shard/IndexShardTests.java | 1 + .../index/engine/EngineTestCase.java | 44 ++++++ .../index/shard/IndexShardTestCase.java | 4 + .../ccr/index/engine/FollowingEngine.java | 48 ++----- .../ShardFollowTaskReplicationTests.java | 4 +- .../index/engine/FollowingEngineTests.java | 34 +++-- 10 files changed, 169 insertions(+), 227 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e9fb2dd90f278..cfdb61a6964df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -156,7 +156,6 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); - private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. @@ -409,17 +408,11 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { - final String key = entry.getKey(); - if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { assert maxUnsafeAutoIdTimestamp.get() == -1 : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); } - if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { - assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : - "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]"; - maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); - } } } @@ -946,46 +939,35 @@ public IndexResult index(Index index) throws IOException { protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } final IndexingStrategy plan; - final boolean appendOnlyRequest = canOptimizeAddDocument(index); - if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { - /* - * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue - * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before - * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. - * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen - * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only - * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. - */ - assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(1L); + // unlike the primary, replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); + if (hasBeenProcessedBefore(index)) { + // the operation seq# was processed and thus the same operation was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + plan = IndexingStrategy.processButSkipLucene(false, index.version()); + } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { + // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; + plan = IndexingStrategy.optimizedAppendOnly(index.version()); } else { - if (appendOnlyRequest == false) { - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; - } versionMap.enforceSafeAccess(); - // unlike the primary, replicas don't really care to about creation status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return false for the created flag in favor of code simplicity - if (index.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()){ - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = IndexingStrategy.processButSkipLucene(false, index.version()); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); - } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.version()); - } + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); } } return plan; @@ -1115,11 +1097,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } - // for testing - long getMaxSeqNoOfNonAppendOnlyOperations() { - return maxSeqNoOfNonAppendOnlyOperations.get(); - } - private void addDocs(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -1168,7 +1145,7 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda Optional.of(earlyResultOnPreFlightError); } - public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { + static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { return new IndexingStrategy(true, false, true, false, versionForIndexing, null); } @@ -1313,15 +1290,9 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert assertNonPrimaryOrigin(delete); - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; - // unlike the primary, replicas don't really care to about found status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return true for the found flag in favor of code simplicity final DeletionStrategy plan; - if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + if (hasBeenProcessedBefore(delete)) { + // the operation seq# was processed thus this operation was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already // part of the lucene commit (either from a peer recovery or a local translog) // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in @@ -1498,7 +1469,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get()); } else { markSeqNoAsSeen(noOp.seqNo()); - if (softDeleteEnabled) { + if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5259829de89f7..599133eb43d0a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -260,7 +260,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(2, searcher.getIndexReader().numDocs()); } - assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + if (operation.origin() == PRIMARY) { + assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + } engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid(), primaryTerm.get())); assertTrue("safe access should be required", engine.isSafeAccessRequired()); engine.refresh("test"); @@ -3461,7 +3463,7 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException operation.versionType(), REPLICA, operation.startTime()+1, UNASSIGNED_SEQ_NO, 0); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3471,19 +3473,19 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException assertEquals(1, engine.getNumVersionLookups()); assertLuceneOperations(engine, 1, 0, 1); Engine.IndexResult retryResult = engine.index(retry); - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); engine.delete(delete); assertLuceneOperations(engine, 1, 0, 1); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(1, engine.getNumVersionLookups()); Engine.IndexResult indexResult = engine.index(operation); - assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3502,7 +3504,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5)); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3514,13 +3516,13 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); if (operation.seqNo() > retry.seqNo()) { @@ -3528,7 +3530,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3569,27 +3571,27 @@ public void testDoubleDeliveryReplica() throws IOException { if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(indexResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -5177,102 +5179,6 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } } - public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { - IOUtils.close(engine, store); - store = createStore(); - final Path translogPath = createTempDir(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - final CountDownLatch latch = new CountDownLatch(1); - final Thread appendOnlyIndexer = new Thread(() -> { - try { - latch.countDown(); - final int numDocs = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = - testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { - engine.index(appendOnlyReplica(doc, randomBoolean(), 1, generateNewSeqNo(engine))); - } else { - engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); - } - } - } catch (Exception ex) { - throw new RuntimeException("Failed to index", ex); - } - }); - appendOnlyIndexer.setName("append-only indexer"); - appendOnlyIndexer.start(); - latch.await(); - long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED; - final int numOps = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numOps; i++) { - ParsedDocument parsedDocument = - testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations - final long seqno = generateNewSeqNo(engine); - final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); - if (randomBoolean()) { - engine.index(doc); - } else { - engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), - doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); - } - maxSeqNoOfNonAppendOnly = seqno; - } else { // On primary - do not update max_seqno for non-append-only operations - if (randomBoolean()) { - engine.index(indexForDoc(parsedDocument)); - } else { - engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), - newUid(parsedDocument.id()), primaryTerm.get())); - } - } - } - appendOnlyIndexer.join(120_000); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); - engine.syncTranslog(); - globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.flush(); - } - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - assertThat("max_seqno from non-append-only was not bootstrap from the safe commit", - engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get())); - } - } - - public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { - long lookupTimes = 0L; - final int initDocs = between(0, 10); - for (int i = 0; i < initDocs; i++) { - index(engine, i); - lookupTimes++; - } - // doc1 is delayed and arrived after a non-append-only op. - final long seqNoAppendOnly1 = generateNewSeqNo(engine); - final long seqnoNormalOp = generateNewSeqNo(engine); - if (randomBoolean()) { - engine.index(replicaIndexForDoc( - testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); - } else { - engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); - } - lookupTimes++; - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); - - // should not optimize for doc1 and process as a regular doc (eg. look up in version map) - engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), seqNoAppendOnly1)); - lookupTimes++; - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - - // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. - engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, - testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), generateNewSeqNo(engine))); - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - } - public void testTrimUnsafeCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final int maxSeqNo = 40; diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index fd7d0122766f8..30b4db9c90368 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,18 +150,16 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } /** - * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation - * into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into - * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies - * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. + * A nested document is indexed into Lucene as multiple documents. While the root document has both sequence number and primary term, + * non-root documents don't have primary term but only sequence numbers. This test verifies that {@link LuceneChangesSnapshot} + * correctly skip non-root documents and returns at most one operation per sequence number. */ - public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { + public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); int totalOps = 0; for (Engine.Operation op : operations) { - // Engine skips deletes or indexes below the local checkpoint - if (engine.getProcessedLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) { + if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { totalOps += ((Engine.Index) op).docs().size(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 19f3c0a5c7e39..64c0f5f0b6269 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.engine.SegmentsStats; @@ -59,8 +60,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; @@ -86,6 +89,9 @@ public void testSimpleReplication() throws Exception { final int docCount = randomInt(50); shards.indexDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -95,6 +101,9 @@ public void testSimpleAppendOnlyReplication() throws Exception { final int docCount = randomInt(50); shards.appendDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -646,4 +655,29 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { shards.assertAllEqual(0); } } + + public void testIndexingOptimizationUsingSequenceNumbers() throws Exception { + final Set liveDocs = new HashSet<>(); + try (ReplicationGroup group = createGroup(2)) { + group.startAll(); + int numDocs = randomIntBetween(1, 100); + long versionLookups = 0; + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(randomIntBetween(1, 100)); + if (randomBoolean()) { + group.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + if (liveDocs.add(id) == false) { + versionLookups++; + } + } else { + group.delete(new DeleteRequest(index.getName(), "type", id)); + liveDocs.remove(id); + versionLookups++; + } + } + for (IndexShard replica : group.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(versionLookups)); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 50f9ce8e8b18c..84470b38fda6a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3989,6 +3989,7 @@ public void testTypelessDelete() throws IOException { IndexMetaData metaData = IndexMetaData.builder("index") .putMapping("some_type", "{ \"properties\": {}}") .settings(settings) + .primaryTerm(0, 1) .build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(shard); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3428476a39645..fd1eda21ce6cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -44,6 +44,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -75,6 +76,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; @@ -112,6 +114,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,11 +268,13 @@ public void tearDown() throws Exception { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); assertMaxSeqNoInCommitUserData(replicaEngine); + assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); } assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); @@ -1118,6 +1123,38 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } } + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { + if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) { + return; + } + try { + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); + Set seqNos = new HashSet<>(); + for (LeafReaderContext leaf : reader.leaves()) { + NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDocValues.advanceExact(docId)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); + if (primaryTermDocValues.advanceExact(docId)) { + if (seqNos.add(seqNo) == false) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + leaf.reader().document(docId, idFieldVisitor); + throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); + } + } + } + } + } + } catch (AlreadyClosedException ignored) { + + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() @@ -1187,4 +1224,11 @@ static long maxSeqNosInReader(DirectoryReader reader) throws IOException { } return maxSeqNo; } + + /** + * Returns the number of times a version was looked up either from version map or from the index. + */ + public static long getNumVersionLookups(Engine engine) { + return ((InternalEngine) engine).getNumVersionLookups(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index de914c94ebf49..47a8f73ef62c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -519,6 +519,10 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } } finally { IOUtils.close(() -> shard.close("test", false), shard.store()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8d4f0b219bd2c..fd9c449de7e0c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -18,7 +18,6 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; @@ -36,7 +35,6 @@ */ public final class FollowingEngine extends InternalEngine { - private final CounterMetric numOfOptimizedIndexing = new CounterMetric(); /** * Construct a new following engine with the specified engine configuration. @@ -68,32 +66,18 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. - final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); - assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; - if (hasBeenProcessedBefore(index)) { - if (logger.isTraceEnabled()) { - logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin()); - } - if (index.origin() == Operation.Origin.PRIMARY) { - /* - * The existing operation in this engine was probably assigned the term of the previous primary shard which is different - * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely - * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint - * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency - * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). - */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( - shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); - return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); - } else { - return IndexingStrategy.processButSkipLucene(false, index.version()); - } - } else if (maxSeqNoOfUpdatesOrDeletes <= getProcessedLocalCheckpoint()) { - assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; - numOfOptimizedIndexing.inc(); - return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.version()); + if (index.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(index)) { + /* + * The existing operation in this engine was probably assigned the term of the previous primary shard which is different + * from the term of the current operation. If the current operation arrives on replicas before the previous operation, + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + */ + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); + return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return planIndexingAsNonPrimary(index); } @@ -202,14 +186,6 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { } } - /** - * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. - * This metric is not persisted, and started from 0 when the engine is opened. - */ - public long getNumberOfOptimizedIndexing() { - return numOfOptimizedIndexing.count(); - } - @Override public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is not verified when the following engine is closed, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index f88b6542392c8..1d3e5ad7e4f6a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -55,7 +56,6 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; @@ -110,7 +110,7 @@ public void testSimpleCcrReplication() throws Exception { followerGroup.assertAllEqual(indexedDocIds.size()); }); for (IndexShard shard : followerGroup) { - assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + assertThat(EngineTestCase.getNumVersionLookups(getEngine(shard)), equalTo(0L)); } // Deletes should be replicated to the follower List deleteDocIds = randomSubsetOf(indexedDocIds); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 98bfa1b2068bb..e6fba3f5741ae 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,7 +58,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.elasticsearch.index.engine.EngineTestCase.createMapperService; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; +import static org.elasticsearch.index.engine.EngineTestCase.getNumVersionLookups; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -339,23 +341,27 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Do not apply optimization for deletes or updates + long versionLookUps = 0; for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { + versionLookUps++; leader.index(indexForPrimary(Integer.toString(i))); } else if (randomBoolean()) { + versionLookUps++; leader.delete(deleteForPrimary(Integer.toString(i))); } } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + assertThat(getNumVersionLookups(follower), greaterThanOrEqualTo(versionLookUps)); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Apply optimization for documents that do not exist long moreDocs = between(1, 100); + versionLookUps = getNumVersionLookups(follower); Set docIds = getDocIds(follower, true).stream().map(doc -> doc.getId()).collect(Collectors.toSet()); for (int i = 0; i < moreDocs; i++) { String docId = randomValueOtherThanMany(docIds::contains, () -> Integer.toString(between(1, 1000))); @@ -364,7 +370,7 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); + assertThat(getNumVersionLookups(follower), equalTo(versionLookUps)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); }); } @@ -379,7 +385,7 @@ public void testOptimizeAppendOnly() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); }); } @@ -397,13 +403,14 @@ public void testOptimizeMultipleVersions() throws Exception { runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + long numVersionLookups = getNumVersionLookups(follower); final List appendOps = new ArrayList<>(); for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { appendOps.add(indexForPrimary("append-" + i)); } EngineTestCase.concurrentlyApplyOps(appendOps, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); }); } @@ -411,19 +418,19 @@ public void testOptimizeSingleDocSequentially() throws Exception { runFollowTest((leader, follower) -> { leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(2L)); }); } @@ -434,19 +441,18 @@ public void testOptimizeSingleDocConcurrently() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); - long numOptimized = follower.getNumberOfOptimizedIndexing(); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); + long numVersionLookups = getNumVersionLookups(follower); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups + 1L)); }); } @@ -480,6 +486,8 @@ private void runFollowTest(CheckedBiConsumer