diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index f707ecc1fe65c..f50995048eadd 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -30,6 +30,7 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.FilterDirectoryReader; import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; @@ -833,6 +834,58 @@ public int length() { }; } + /** + * Wraps a directory reader to include all live docs. + * The wrapped reader can be used to query all documents. + * + * @param in the input directory reader + * @return the wrapped reader + */ + public static DirectoryReader wrapAllDocsLive(DirectoryReader in) throws IOException { + return new DirectoryReaderWithAllLiveDocs(in); + } + + private static final class DirectoryReaderWithAllLiveDocs extends FilterDirectoryReader { + static final class SubReaderWithAllLiveDocs extends FilterLeafReader { + SubReaderWithAllLiveDocs(LeafReader in) { + super(in); + } + @Override + public Bits getLiveDocs() { + return null; + } + @Override + public int numDocs() { + return maxDoc(); + } + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + @Override + public CacheHelper getReaderCacheHelper() { + return null; // Modifying liveDocs + } + } + DirectoryReaderWithAllLiveDocs(DirectoryReader in) throws IOException { + super(in, new FilterDirectoryReader.SubReaderWrapper() { + @Override + public LeafReader wrap(LeafReader leaf) { + return new SubReaderWithAllLiveDocs(leaf); + } + }); + } + @Override + protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { + return wrapAllDocsLive(in); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; // Modifying liveDocs + } + } + /** * Returns a numeric docvalues which can be used to soft-delete documents. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 191d01c78dbfe..88f271914239a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -21,12 +21,14 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; @@ -43,6 +45,7 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; +import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; @@ -769,10 +772,9 @@ public IndexResult index(Index index) throws IOException { if (plan.earlyResultOnPreFlightError.isPresent()) { indexResult = plan.earlyResultOnPreFlightError.get(); assert indexResult.hasFailure(); - } else if (plan.indexIntoLucene) { + } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) { indexResult = indexIntoLucene(index, plan); } else { - // TODO: We need to index stale documents to have a full history in Lucene. indexResult = new IndexResult( plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } @@ -838,7 +840,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){ // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -847,16 +848,15 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() - ); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + } else { + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + index.seqNo(), index.version()); + } } } return plan; @@ -914,7 +914,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException { assert plan.seqNoForIndexing >= 0 : "ops should have an assigned seq no.; origin: " + index.origin(); assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing; - assert plan.indexIntoLucene; + assert plan.indexIntoLucene || plan.addStaleOpToLucene; /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. @@ -922,7 +922,9 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) index.parsedDoc().updateSeqID(plan.seqNoForIndexing, index.primaryTerm()); index.parsedDoc().version().setLongValue(plan.versionForIndexing); try { - if (plan.useLuceneUpdateDocument) { + if (plan.addStaleOpToLucene) { + addStaleDocs(index.docs(), indexWriter); + } else if (plan.useLuceneUpdateDocument) { updateDocs(index.uid(), index.docs(), indexWriter); } else { // document does not exists, we can optimize for create, but double check if assertions are running @@ -986,16 +988,27 @@ private void addDocs(final List docs, final IndexWriter i numDocAppends.inc(docs.size()); } + private void addStaleDocs(final List docs, final IndexWriter indexWriter) throws IOException { + assert softDeleteEnabled : "Add history documents but soft-deletes is disabled"; + docs.forEach(d -> d.add(softDeleteField)); + if (docs.size() > 1) { + indexWriter.addDocuments(docs); + } else { + indexWriter.addDocument(docs.get(0)); + } + } + protected static final class IndexingStrategy { final boolean currentNotFoundOrDeleted; final boolean useLuceneUpdateDocument; final long seqNoForIndexing; final long versionForIndexing; final boolean indexIntoLucene; + final boolean addStaleOpToLucene; final Optional earlyResultOnPreFlightError; private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpdateDocument, - boolean indexIntoLucene, long seqNoForIndexing, + boolean indexIntoLucene, boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing, IndexResult earlyResultOnPreFlightError) { assert useLuceneUpdateDocument == false || indexIntoLucene : "use lucene update is set to true, but we're not indexing into lucene"; @@ -1008,37 +1021,40 @@ private IndexingStrategy(boolean currentNotFoundOrDeleted, boolean useLuceneUpda this.seqNoForIndexing = seqNoForIndexing; this.versionForIndexing = versionForIndexing; this.indexIntoLucene = indexIntoLucene; + this.addStaleOpToLucene = addStaleOpToLucene; this.earlyResultOnPreFlightError = earlyResultOnPreFlightError == null ? Optional.empty() : Optional.of(earlyResultOnPreFlightError); } static IndexingStrategy optimizedAppendOnly(long seqNoForIndexing) { - return new IndexingStrategy(true, false, true, seqNoForIndexing, 1, null); + return new IndexingStrategy(true, false, true, false, seqNoForIndexing, 1, null); } static IndexingStrategy skipDueToVersionConflict( VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion) { final IndexResult result = new IndexResult(e, currentVersion); return new IndexingStrategy( - currentNotFoundOrDeleted, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); + currentNotFoundOrDeleted, false, false, false, SequenceNumbers.UNASSIGNED_SEQ_NO, Versions.NOT_FOUND, result); } static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { return new IndexingStrategy(currentNotFoundOrDeleted, currentNotFoundOrDeleted == false, - true, seqNoForIndexing, versionForIndexing, null); + true, false, seqNoForIndexing, versionForIndexing, null); } static IndexingStrategy overrideExistingAsIfNotThere( long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(true, true, true, seqNoForIndexing, versionForIndexing, null); + return new IndexingStrategy(true, true, true, false, seqNoForIndexing, versionForIndexing, null); + } + + static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, seqNoForIndexing, versionForIndexing, null); } - static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted, - long seqNoForIndexing, long versionForIndexing) { - return new IndexingStrategy(currentNotFoundOrDeleted, false, - false, seqNoForIndexing, versionForIndexing, null); + static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long seqNoForIndexing, long versionForIndexing) { + return new IndexingStrategy(false, false, false, addStaleOpToLucene, seqNoForIndexing, versionForIndexing, null); } } @@ -1096,7 +1112,7 @@ public DeleteResult delete(Delete delete) throws IOException { if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); - } else if (plan.deleteFromLucene) { + } else if (plan.deleteFromLucene || plan.addStaleOpToLucene) { deleteResult = deleteInLucene(delete, plan); } else { deleteResult = new DeleteResult( @@ -1152,7 +1168,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; + final DeletionStrategy plan; if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -1161,18 +1177,15 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } - - final DeletionStrategy plan; - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + } else { + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + delete.seqNo(), delete.version()); + } } return plan; } @@ -1212,25 +1225,29 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { try { - if (plan.currentlyDeleted == false) { - // any exception that comes from this is a either an ACE or a fatal exception there - // can't be any document failures coming from this - if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); - final ParseContext.Document doc = tombstone.docs().get(0); - doc.add(softDeleteField); - indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); + if (softDeleteEnabled) { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id()); + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); + tombstone.version().setLongValue(plan.versionOfDeletion); + final ParseContext.Document doc = tombstone.docs().get(0); + doc.add(softDeleteField); + if (plan.addStaleOpToLucene || plan.currentlyDeleted) { + indexWriter.addDocument(doc); } else { - indexWriter.deleteDocuments(delete.uid()); + indexWriter.softUpdateDocument(delete.uid(), doc, softDeleteField); } + } else if (plan.currentlyDeleted == false) { + // any exception that comes from this is a either an ACE or a fatal exception there + // can't be any document failures coming from this + indexWriter.deleteDocuments(delete.uid()); + } + if (plan.deleteFromLucene) { numDocDeletes.inc(); + versionMap.putDeleteUnderLock(delete.uid().bytes(), + new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), + engineConfig.getThreadPool().relativeTimeInMillis())); } - versionMap.putDeleteUnderLock(delete.uid().bytes(), - new DeleteVersionValue(plan.versionOfDeletion, plan.seqNoOfDeletion, delete.primaryTerm(), - engineConfig.getThreadPool().relativeTimeInMillis())); return new DeleteResult( plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } catch (Exception ex) { @@ -1247,12 +1264,13 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) protected static final class DeletionStrategy { // of a rare double delete final boolean deleteFromLucene; + final boolean addStaleOpToLucene; final boolean currentlyDeleted; final long seqNoOfDeletion; final long versionOfDeletion; final Optional earlyResultOnPreflightError; - private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, + private DeletionStrategy(boolean deleteFromLucene, boolean addStaleOpToLucene, boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion, DeleteResult earlyResultOnPreflightError) { assert (deleteFromLucene && earlyResultOnPreflightError != null) == false : @@ -1260,6 +1278,7 @@ private DeletionStrategy(boolean deleteFromLucene, boolean currentlyDeleted, "deleteFromLucene: " + deleteFromLucene + " earlyResultOnPreFlightError:" + earlyResultOnPreflightError; this.deleteFromLucene = deleteFromLucene; + this.addStaleOpToLucene = addStaleOpToLucene; this.currentlyDeleted = currentlyDeleted; this.seqNoOfDeletion = seqNoOfDeletion; this.versionOfDeletion = versionOfDeletion; @@ -1271,16 +1290,22 @@ static DeletionStrategy skipDueToVersionConflict( VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) { final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; final DeleteResult deleteResult = new DeleteResult(e, currentVersion, unassignedSeqNo, currentlyDeleted == false); - return new DeletionStrategy(false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); + return new DeletionStrategy(false, false, currentlyDeleted, unassignedSeqNo, Versions.NOT_FOUND, deleteResult); } static DeletionStrategy processNormally(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(true, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + return new DeletionStrategy(true, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + + } + public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(false, false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } - public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, long seqNoOfDeletion, long versionOfDeletion) { - return new DeletionStrategy(false, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); + static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, boolean currentlyDeleted, + long seqNoOfDeletion, long versionOfDeletion) { + return new DeletionStrategy(false, addStaleOpToLucene, currentlyDeleted, seqNoOfDeletion, versionOfDeletion, null); } } @@ -1930,7 +1955,11 @@ private IndexWriter createWriter() throws IOException { // pkg-private for testing IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return new IndexWriter(directory, iwc); + if (Assertions.ENABLED) { + return new AssertingIndexWriter(directory, iwc); + } else { + return new IndexWriter(directory, iwc); + } } private IndexWriterConfig getIndexWriterConfig() { @@ -2288,4 +2317,35 @@ private static Map commitDataAsMap(final IndexWriter indexWriter } return commitData; } + + private final class AssertingIndexWriter extends IndexWriter { + AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { + super(d, conf); + } + @Override + public long updateDocument(Term term, Iterable doc) throws IOException { + assert softDeleteEnabled == false : "Call #updateDocument but soft-deletes is enabled"; + return super.updateDocument(term, doc); + } + @Override + public long updateDocuments(Term delTerm, Iterable> docs) throws IOException { + assert softDeleteEnabled == false : "Call #updateDocuments but soft-deletes is enabled"; + return super.updateDocuments(delTerm, docs); + } + @Override + public long deleteDocuments(Term... terms) throws IOException { + assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled"; + return super.deleteDocuments(terms); + } + @Override + public long softUpdateDocument(Term term, Iterable doc, Field... softDeletes) throws IOException { + assert softDeleteEnabled : "Call #softUpdateDocument but soft-deletes is disabled"; + return super.softUpdateDocument(term, doc, softDeletes); + } + @Override + public long softUpdateDocuments(Term term, Iterable> docs, Field... softDeletes) throws IOException { + assert softDeleteEnabled : "Call #softUpdateDocuments but soft-deletes is disabled"; + return super.softUpdateDocuments(term, docs, softDeletes); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fa56a6a5ce49b..a61f2b462f616 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -44,10 +44,12 @@ import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.LogDocMergePolicy; +import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.PointValues; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; @@ -81,6 +83,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -2935,10 +2938,10 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException } public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { - final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), + final Supplier doc = () -> testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - Engine.Index operation = appendOnlyReplica(doc, false, 1, randomIntBetween(0, 5)); - Engine.Index retry = appendOnlyReplica(doc, true, 1, randomIntBetween(0, 5)); + Engine.Index operation = appendOnlyReplica(doc.get(), false, 1, randomIntBetween(0, 5)); + Engine.Index retry = appendOnlyReplica(doc.get(), true, 1, randomIntBetween(0, 5)); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; @@ -2977,8 +2980,8 @@ public void testDoubleDeliveryReplicaAppendingOnly() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - operation = randomAppendOnly(doc, false, 1); - retry = randomAppendOnly(doc, true, 1); + operation = randomAppendOnly(doc.get(), false, 1); + retry = randomAppendOnly(doc.get(), true, 1); if (randomBoolean()) { Engine.IndexResult indexResult = engine.index(operation); assertNotNull(indexResult.getTranslogLocation()); @@ -3043,6 +3046,7 @@ public void testDoubleDeliveryReplica() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } + assertThat(getOperationSeqNoInLucene(engine), contains(20L)); } public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException { @@ -3511,20 +3515,22 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio final List operations = new ArrayList<>(); final int numberOfOperations = randomIntBetween(16, 32); - final Document document = testDocumentWithTextField(); final AtomicLong sequenceNumber = new AtomicLong(); final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA); final LongSupplier sequenceNumberSupplier = origin == PRIMARY ? () -> SequenceNumbers.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement; - document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); - final ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); - final Term uid = newUid(doc); + final Supplier doc = () -> { + final Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); + return testParsedDocument("1", null, document, B_1, null); + }; + final Term uid = newUid("1"); final BiFunction searcherFactory = engine::acquireSearcher; for (int i = 0; i < numberOfOperations; i++) { if (randomBoolean()) { final Engine.Index index = new Engine.Index( uid, - doc, + doc.get(), sequenceNumberSupplier.getAsLong(), 1, i, @@ -4558,6 +4564,49 @@ public void testTrimUnsafeCommits() throws Exception { } } + public void testLuceneHistoryOnPrimary() throws Exception { + final List operations = generateSingleDocHistory(false, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300); + assertOperationHistoryInLucene(operations); + } + + public void testLuceneHistoryOnReplica() throws Exception { + final List operations = generateSingleDocHistory(true, + randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300); + Randomness.shuffle(operations); + assertOperationHistoryInLucene(operations); + } + + private void assertOperationHistoryInLucene(List operations) throws IOException { + final MergePolicy keepSoftDeleteDocsMP = new SoftDeletesRetentionMergePolicy( + Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), engine.config().getMergePolicy()); + Set expectedSeqNos = new HashSet<>(); + try (Store store = createStore(); + Engine engine = createEngine(config(defaultSettings, store, createTempDir(), keepSoftDeleteDocsMP, null))) { + for (Engine.Operation op : operations) { + if (op instanceof Engine.Index) { + Engine.IndexResult indexResult = engine.index((Engine.Index) op); + assertThat(indexResult.getFailure(), nullValue()); + expectedSeqNos.add(indexResult.getSeqNo()); + } else { + Engine.DeleteResult deleteResult = engine.delete((Engine.Delete) op); + assertThat(deleteResult.getFailure(), nullValue()); + expectedSeqNos.add(deleteResult.getSeqNo()); + } + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.flush(); + } + if (rarely()) { + engine.forceMerge(true); + } + } + assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray())); + } + } + private static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index dfe83cb892474..0c394945bcbf3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -3089,4 +3090,30 @@ public void testSupplyTombstoneDoc() throws Exception { assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id))); closeShards(shard); } + + public void testSearcherIncludesSoftDeletes() throws Exception { + Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .build(); + IndexMetaData metaData = IndexMetaData.builder("test") + .putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1).build(); + IndexShard shard = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null); + recoverShardFromStore(shard); + indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}"); + indexDoc(shard, "test", "1", "{\"foo\" : \"baz\"}"); + deleteDoc(shard, "test", "0"); + shard.refresh("test"); + try (Engine.Searcher searcher = shard.acquireSearcher("test")) { + IndexSearcher searchWithSoftDeletes = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(0L)); + assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "bar")), 10).totalHits, equalTo(1L)); + assertThat(searcher.searcher().search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); + assertThat(searchWithSoftDeletes.search(new TermQuery(new Term("foo", "baz")), 10).totalHits, equalTo(1L)); + } + closeShards(shard); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9cb3c7e98bfa5..57ebf90efcf12 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -28,18 +28,22 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; -import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -53,6 +57,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -88,7 +93,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -551,6 +558,7 @@ public static List generateSingleDocHistory( } else { startWithSeqNo = 0; } + final int seqNoGap = randomBoolean() ? 1 : 2; final String valuePrefix = forReplica ? "r_" : "p_"; final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); for (int i = 0; i < numOfOps; i++) { @@ -574,7 +582,7 @@ public static List generateSingleDocHistory( } if (randomBoolean()) { op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, @@ -583,7 +591,7 @@ public static List generateSingleDocHistory( ); } else { op = new Engine.Delete("test", "1", id, - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO, forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, @@ -664,6 +672,33 @@ public static void assertOpsOnReplica( } } + /** + * Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene. + */ + public static Set getOperationSeqNoInLucene(Engine engine) throws IOException { + engine.refresh("test"); + final Set seqNos = new HashSet<>(); + try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { + IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + List leaves = indexSearcher.getIndexReader().leaves(); + NumericDocValues[] seqNoDocValues = new NumericDocValues[leaves.size()]; + for (int i = 0; i < leaves.size(); i++) { + seqNoDocValues[i] = leaves.get(i).reader().getNumericDocValues(SeqNoFieldMapper.NAME); + } + TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + for (ScoreDoc scoreDoc : allDocs.scoreDocs) { + int leafIndex = ReaderUtil.subIndex(scoreDoc.doc, leaves); + int segmentDocId = scoreDoc.doc - leaves.get(leafIndex).docBase; + if (seqNoDocValues[leafIndex] != null && seqNoDocValues[leafIndex].advanceExact(segmentDocId)) { + seqNos.add(seqNoDocValues[leafIndex].longValue()); + } else { + throw new AssertionError("Segment without seqno DocValues"); + } + } + } + return seqNos; + } + /** * Exposes a translog associated with the given engine for testing purpose. */