diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index e9fb2dd90f278..cfdb61a6964df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -156,7 +156,6 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); - private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. @@ -409,17 +408,11 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { for (Map.Entry entry : writer.getLiveCommitData()) { - final String key = entry.getKey(); - if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { assert maxUnsafeAutoIdTimestamp.get() == -1 : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); } - if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { - assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : - "max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]"; - maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue())); - } } } @@ -946,46 +939,35 @@ public IndexResult index(Index index) throws IOException { protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } final IndexingStrategy plan; - final boolean appendOnlyRequest = canOptimizeAddDocument(index); - if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { - /* - * As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue - * a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before - * the original append-only. In this case we can't simply proceed with the append only without consulting the version map. - * If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen - * the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only - * requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization. - */ - assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; - plan = IndexingStrategy.optimizedAppendOnly(1L); + // unlike the primary, replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); + if (hasBeenProcessedBefore(index)) { + // the operation seq# was processed and thus the same operation was already put into lucene + // this can happen during recovery where older operations are sent from the translog that are already + // part of the lucene commit (either from a peer recovery or a local translog) + // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in + // question may have been deleted in an out of order op that is not replayed. + // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery + // See testRecoveryWithOutOfOrderDelete for an example of peer recovery + plan = IndexingStrategy.processButSkipLucene(false, index.version()); + } else if (maxSeqNoOfUpdatesOrDeletes <= localCheckpointTracker.getProcessedCheckpoint()) { + // see Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers + assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : index.seqNo() + ">=" + maxSeqNoOfUpdatesOrDeletes; + plan = IndexingStrategy.optimizedAppendOnly(index.version()); } else { - if (appendOnlyRequest == false) { - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]"; - } versionMap.enforceSafeAccess(); - // unlike the primary, replicas don't really care to about creation status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return false for the created flag in favor of code simplicity - if (index.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()){ - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene - // this can happen during recovery where older operations are sent from the translog that are already - // part of the lucene commit (either from a peer recovery or a local translog) - // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in - // question may have been deleted in an out of order op that is not replayed. - // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery - // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - plan = IndexingStrategy.processButSkipLucene(false, index.version()); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); } else { - final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); - } else { - plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - index.version()); - } + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version()); } } return plan; @@ -1115,11 +1097,6 @@ private boolean mayHaveBeenIndexedBefore(Index index) { return mayHaveBeenIndexBefore; } - // for testing - long getMaxSeqNoOfNonAppendOnlyOperations() { - return maxSeqNoOfNonAppendOnlyOperations.get(); - } - private void addDocs(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -1168,7 +1145,7 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda Optional.of(earlyResultOnPreFlightError); } - public static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { + static IndexingStrategy optimizedAppendOnly(long versionForIndexing) { return new IndexingStrategy(true, false, true, false, versionForIndexing, null); } @@ -1313,15 +1290,9 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { assert assertNonPrimaryOrigin(delete); - maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr)); - assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" + - "max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]"; - // unlike the primary, replicas don't really care to about found status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return true for the found flag in favor of code simplicity final DeletionStrategy plan; - if (delete.seqNo() <= localCheckpointTracker.getProcessedCheckpoint()) { - // the operation seq# is lower then the current local checkpoint and thus was already put into lucene + if (hasBeenProcessedBefore(delete)) { + // the operation seq# was processed thus this operation was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already // part of the lucene commit (either from a peer recovery or a local translog) // or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in @@ -1498,7 +1469,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { noOpResult = new NoOpResult(getPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, preFlightError.get()); } else { markSeqNoAsSeen(noOp.seqNo()); - if (softDeleteEnabled) { + if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5259829de89f7..599133eb43d0a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -260,7 +260,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException { try (Engine.Searcher searcher = engine.acquireSearcher("test")) { assertEquals(2, searcher.getIndexReader().numDocs()); } - assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + if (operation.origin() == PRIMARY) { + assertFalse("safe access should NOT be required last indexing round was only append only", engine.isSafeAccessRequired()); + } engine.delete(new Engine.Delete(operation.type(), operation.id(), operation.uid(), primaryTerm.get())); assertTrue("safe access should be required", engine.isSafeAccessRequired()); engine.refresh("test"); @@ -3461,7 +3463,7 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException operation.versionType(), REPLICA, operation.startTime()+1, UNASSIGNED_SEQ_NO, 0); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3471,19 +3473,19 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException assertEquals(1, engine.getNumVersionLookups()); assertLuceneOperations(engine, 1, 0, 1); Engine.IndexResult retryResult = engine.index(retry); - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); engine.delete(delete); assertLuceneOperations(engine, 1, 0, 1); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(1, engine.getNumVersionLookups()); Engine.IndexResult indexResult = engine.index(operation); - assertEquals(belowLckp ? 2 : 3, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 1 : 2, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3502,7 +3504,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5)); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped - final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; + final boolean sameSeqNo = operation.seqNo() == retry.seqNo(); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); @@ -3514,13 +3516,13 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 0 : 1, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(retry); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); Engine.IndexResult indexResult = engine.index(operation); if (operation.seqNo() > retry.seqNo()) { @@ -3528,7 +3530,7 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { } else { assertLuceneOperations(engine, 1, 0, 0); } - assertEquals(belowLckp ? 1 : 2, engine.getNumVersionLookups()); + assertEquals(sameSeqNo ? 0 : 1, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -3569,27 +3571,27 @@ public void testDoubleDeliveryReplica() throws IOException { if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(indexResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) > 0); } else { Engine.IndexResult retryResult = engine.index(duplicate); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(1, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); if (randomBoolean()) { engine.refresh("test"); } Engine.IndexResult indexResult = engine.index(operation); assertLuceneOperations(engine, 1, 0, 0); - assertEquals(2, engine.getNumVersionLookups()); + assertEquals(0, engine.getNumVersionLookups()); assertNotNull(retryResult.getTranslogLocation()); assertTrue(retryResult.getTranslogLocation().compareTo(indexResult.getTranslogLocation()) < 0); } @@ -5177,102 +5179,6 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception { } } - public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { - IOUtils.close(engine, store); - store = createStore(); - final Path translogPath = createTempDir(); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - final CountDownLatch latch = new CountDownLatch(1); - final Thread appendOnlyIndexer = new Thread(() -> { - try { - latch.countDown(); - final int numDocs = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = - testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { - engine.index(appendOnlyReplica(doc, randomBoolean(), 1, generateNewSeqNo(engine))); - } else { - engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); - } - } - } catch (Exception ex) { - throw new RuntimeException("Failed to index", ex); - } - }); - appendOnlyIndexer.setName("append-only indexer"); - appendOnlyIndexer.start(); - latch.await(); - long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED; - final int numOps = scaledRandomIntBetween(100, 1000); - for (int i = 0; i < numOps; i++) { - ParsedDocument parsedDocument = - testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); - if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations - final long seqno = generateNewSeqNo(engine); - final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); - if (randomBoolean()) { - engine.index(doc); - } else { - engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), - doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); - } - maxSeqNoOfNonAppendOnly = seqno; - } else { // On primary - do not update max_seqno for non-append-only operations - if (randomBoolean()) { - engine.index(indexForDoc(parsedDocument)); - } else { - engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), - newUid(parsedDocument.id()), primaryTerm.get())); - } - } - } - appendOnlyIndexer.join(120_000); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); - engine.syncTranslog(); - globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.flush(); - } - try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) { - assertThat("max_seqno from non-append-only was not bootstrap from the safe commit", - engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get())); - } - } - - public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { - long lookupTimes = 0L; - final int initDocs = between(0, 10); - for (int i = 0; i < initDocs; i++) { - index(engine, i); - lookupTimes++; - } - // doc1 is delayed and arrived after a non-append-only op. - final long seqNoAppendOnly1 = generateNewSeqNo(engine); - final long seqnoNormalOp = generateNewSeqNo(engine); - if (randomBoolean()) { - engine.index(replicaIndexForDoc( - testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); - } else { - engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong())); - } - lookupTimes++; - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp)); - - // should not optimize for doc1 and process as a regular doc (eg. look up in version map) - engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), seqNoAppendOnly1)); - lookupTimes++; - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - - // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. - engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, - testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), generateNewSeqNo(engine))); - assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); - } - public void testTrimUnsafeCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final int maxSeqNo = 40; diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index fd7d0122766f8..30b4db9c90368 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -150,18 +150,16 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f } /** - * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation - * into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into - * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies - * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. + * A nested document is indexed into Lucene as multiple documents. While the root document has both sequence number and primary term, + * non-root documents don't have primary term but only sequence numbers. This test verifies that {@link LuceneChangesSnapshot} + * correctly skip non-root documents and returns at most one operation per sequence number. */ - public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { + public void testSkipNonRootOfNestedDocuments() throws Exception { Map seqNoToTerm = new HashMap<>(); List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); int totalOps = 0; for (Engine.Operation op : operations) { - // Engine skips deletes or indexes below the local checkpoint - if (engine.getProcessedLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) { + if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) { seqNoToTerm.put(op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { totalOps += ((Engine.Index) op).docs().size(); diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 19f3c0a5c7e39..64c0f5f0b6269 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.engine.SegmentsStats; @@ -59,8 +60,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; @@ -86,6 +89,9 @@ public void testSimpleReplication() throws Exception { final int docCount = randomInt(50); shards.indexDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -95,6 +101,9 @@ public void testSimpleAppendOnlyReplication() throws Exception { final int docCount = randomInt(50); shards.appendDocs(docCount); shards.assertAllEqual(docCount); + for (IndexShard replica : shards.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(0L)); + } } } @@ -646,4 +655,29 @@ public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception { shards.assertAllEqual(0); } } + + public void testIndexingOptimizationUsingSequenceNumbers() throws Exception { + final Set liveDocs = new HashSet<>(); + try (ReplicationGroup group = createGroup(2)) { + group.startAll(); + int numDocs = randomIntBetween(1, 100); + long versionLookups = 0; + for (int i = 0; i < numDocs; i++) { + String id = Integer.toString(randomIntBetween(1, 100)); + if (randomBoolean()) { + group.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); + if (liveDocs.add(id) == false) { + versionLookups++; + } + } else { + group.delete(new DeleteRequest(index.getName(), "type", id)); + liveDocs.remove(id); + versionLookups++; + } + } + for (IndexShard replica : group.getReplicas()) { + assertThat(EngineTestCase.getNumVersionLookups(getEngine(replica)), equalTo(versionLookups)); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 50f9ce8e8b18c..84470b38fda6a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3989,6 +3989,7 @@ public void testTypelessDelete() throws IOException { IndexMetaData metaData = IndexMetaData.builder("index") .putMapping("some_type", "{ \"properties\": {}}") .settings(settings) + .primaryTerm(0, 1) .build(); IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); recoverShardFromStore(shard); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 3428476a39645..fd1eda21ce6cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -44,6 +44,7 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TotalHitCountCollector; +import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -75,6 +76,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.fieldvisitor.IdOnlyFieldVisitor; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; @@ -112,6 +114,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -265,11 +268,13 @@ public void tearDown() throws Exception { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); assertMaxSeqNoInCommitUserData(replicaEngine); + assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); } assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); @@ -1118,6 +1123,38 @@ public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exceptio } } + public static void assertAtMostOneLuceneDocumentPerSequenceNumber(Engine engine) throws IOException { + if (engine.config().getIndexSettings().isSoftDeleteEnabled() == false || engine instanceof InternalEngine == false) { + return; + } + try { + engine.refresh("test"); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + DirectoryReader reader = Lucene.wrapAllDocsLive(searcher.getDirectoryReader()); + Set seqNos = new HashSet<>(); + for (LeafReaderContext leaf : reader.leaves()) { + NumericDocValues primaryTermDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + NumericDocValues seqNoDocValues = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + int docId; + while ((docId = seqNoDocValues.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + assertTrue(seqNoDocValues.advanceExact(docId)); + long seqNo = seqNoDocValues.longValue(); + assertThat(seqNo, greaterThanOrEqualTo(0L)); + if (primaryTermDocValues.advanceExact(docId)) { + if (seqNos.add(seqNo) == false) { + final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor(); + leaf.reader().document(docId, idFieldVisitor); + throw new AssertionError("found multiple documents for seq=" + seqNo + " id=" + idFieldVisitor.getId()); + } + } + } + } + } + } catch (AlreadyClosedException ignored) { + + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() @@ -1187,4 +1224,11 @@ static long maxSeqNosInReader(DirectoryReader reader) throws IOException { } return maxSeqNo; } + + /** + * Returns the number of times a version was looked up either from version map or from the index. + */ + public static long getNumVersionLookups(Engine engine) { + return ((InternalEngine) engine).getNumVersionLookups(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index de914c94ebf49..47a8f73ef62c9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -519,6 +519,10 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran if (assertConsistencyBetweenTranslogAndLucene) { assertConsistentHistoryBetweenTranslogAndLucene(shard); } + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } } finally { IOUtils.close(() -> shard.close("test", false), shard.store()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8d4f0b219bd2c..fd9c449de7e0c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -18,7 +18,6 @@ import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; @@ -36,7 +35,6 @@ */ public final class FollowingEngine extends InternalEngine { - private final CounterMetric numOfOptimizedIndexing = new CounterMetric(); /** * Construct a new following engine with the specified engine configuration. @@ -68,32 +66,18 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); - // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. - final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); - assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; - if (hasBeenProcessedBefore(index)) { - if (logger.isTraceEnabled()) { - logger.trace("index operation [id={} seq_no={} origin={}] was processed before", index.id(), index.seqNo(), index.origin()); - } - if (index.origin() == Operation.Origin.PRIMARY) { - /* - * The existing operation in this engine was probably assigned the term of the previous primary shard which is different - * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely - * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint - * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency - * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). - */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( - shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); - return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); - } else { - return IndexingStrategy.processButSkipLucene(false, index.version()); - } - } else if (maxSeqNoOfUpdatesOrDeletes <= getProcessedLocalCheckpoint()) { - assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]"; - numOfOptimizedIndexing.inc(); - return InternalEngine.IndexingStrategy.optimizedAppendOnly(index.version()); + if (index.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(index)) { + /* + * The existing operation in this engine was probably assigned the term of the previous primary shard which is different + * from the term of the current operation. If the current operation arrives on replicas before the previous operation, + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + */ + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); + return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return planIndexingAsNonPrimary(index); } @@ -202,14 +186,6 @@ private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { } } - /** - * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. - * This metric is not persisted, and started from 0 when the engine is opened. - */ - public long getNumberOfOptimizedIndexing() { - return numOfOptimizedIndexing.count(); - } - @Override public void verifyEngineBeforeIndexClosing() throws IllegalStateException { // the value of the global checkpoint is not verified when the following engine is closed, diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index f88b6542392c8..1d3e5ad7e4f6a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -55,7 +56,6 @@ import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction; -import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; import java.io.IOException; @@ -110,7 +110,7 @@ public void testSimpleCcrReplication() throws Exception { followerGroup.assertAllEqual(indexedDocIds.size()); }); for (IndexShard shard : followerGroup) { - assertThat(((FollowingEngine) (getEngine(shard))).getNumberOfOptimizedIndexing(), equalTo((long) docCount)); + assertThat(EngineTestCase.getNumVersionLookups(getEngine(shard)), equalTo(0L)); } // Deletes should be replicated to the follower List deleteDocIds = randomSubsetOf(indexedDocIds); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 98bfa1b2068bb..e6fba3f5741ae 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -58,7 +58,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import static org.elasticsearch.index.engine.EngineTestCase.createMapperService; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; +import static org.elasticsearch.index.engine.EngineTestCase.getNumVersionLookups; import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -339,23 +341,27 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Do not apply optimization for deletes or updates + long versionLookUps = 0; for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { + versionLookUps++; leader.index(indexForPrimary(Integer.toString(i))); } else if (randomBoolean()) { + versionLookUps++; leader.delete(deleteForPrimary(Integer.toString(i))); } } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + assertThat(getNumVersionLookups(follower), greaterThanOrEqualTo(versionLookUps)); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); // Apply optimization for documents that do not exist long moreDocs = between(1, 100); + versionLookUps = getNumVersionLookups(follower); Set docIds = getDocIds(follower, true).stream().map(doc -> doc.getId()).collect(Collectors.toSet()); for (int i = 0; i < moreDocs; i++) { String docId = randomValueOtherThanMany(docIds::contains, () -> Integer.toString(between(1, 1000))); @@ -364,7 +370,7 @@ public void testBasicOptimization() throws Exception { } EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getMaxSeqNoOfUpdatesOrDeletes())); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numDocs + moreDocs)); + assertThat(getNumVersionLookups(follower), equalTo(versionLookUps)); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); }); } @@ -379,7 +385,7 @@ public void testOptimizeAppendOnly() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); assertThat(follower.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo((long) numOps)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); }); } @@ -397,13 +403,14 @@ public void testOptimizeMultipleVersions() throws Exception { runFollowTest((leader, follower) -> { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); + long numVersionLookups = getNumVersionLookups(follower); final List appendOps = new ArrayList<>(); for (int numAppends = scaledRandomIntBetween(0, 100), i = 0; i < numAppends; i++) { appendOps.add(indexForPrimary("append-" + i)); } EngineTestCase.concurrentlyApplyOps(appendOps, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), greaterThanOrEqualTo((long) appendOps.size())); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); }); } @@ -411,19 +418,19 @@ public void testOptimizeSingleDocSequentially() throws Exception { runFollowTest((leader, follower) -> { leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(0L)); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(1L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(1L)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(2L)); + assertThat(getNumVersionLookups(follower), equalTo(2L)); }); } @@ -434,19 +441,18 @@ public void testOptimizeSingleDocConcurrently() throws Exception { EngineTestCase.concurrentlyApplyOps(ops, leader); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true))); - long numOptimized = follower.getNumberOfOptimizedIndexing(); leader.delete(deleteForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized)); + long numVersionLookups = getNumVersionLookups(follower); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups)); leader.index(indexForPrimary("id")); EngineTestCase.waitForOpsToComplete(follower, leader.getProcessedLocalCheckpoint()); - assertThat(follower.getNumberOfOptimizedIndexing(), equalTo(numOptimized + 1L)); + assertThat(getNumVersionLookups(follower), equalTo(numVersionLookups + 1L)); }); } @@ -480,6 +486,8 @@ private void runFollowTest(CheckedBiConsumer