From 70d5359b871ca7f37bff01e885de8d6f78fed98f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Apr 2018 16:30:24 -0400 Subject: [PATCH] Accept duplicate stale docs --- .../uid/PerThreadIDVersionAndSeqNoLookup.java | 21 ++---- .../lucene/uid/VersionsAndSeqNoResolver.java | 32 --------- .../index/engine/InternalEngine.java | 68 +++++++------------ .../index/engine/InternalEngineTests.java | 4 +- .../index/engine/EngineTestCase.java | 10 +-- 5 files changed, 36 insertions(+), 99 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index 1156f005d5ba6..38fcdfe5f1b62 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -111,18 +111,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found * */ private int getDocID(BytesRef id, Bits liveDocs) throws IOException { - int docID = DocIdSetIterator.NO_MORE_DOCS; - final PostingsEnum docsEnum = getPostingsOrNull(id); - if (docsEnum != null) { + if (termsEnum.seekExact(id)) { + int docID = DocIdSetIterator.NO_MORE_DOCS; // there may be more than one matching docID, in the case of nested docs, so we want the last one: + docsEnum = termsEnum.postings(docsEnum, 0); for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) { if (liveDocs != null && liveDocs.get(d) == false) { continue; } docID = d; } + return docID; + } else { + return DocIdSetIterator.NO_MORE_DOCS; } - return docID; } /** Return null if id is not found. */ @@ -143,15 +145,4 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep return null; } } - - /** - * Returns an internal posting list of the given uid - */ - PostingsEnum getPostingsOrNull(BytesRef id) throws IOException { - if (termsEnum.seekExact(id)) { - docsEnum = termsEnum.postings(docsEnum, 0); - return docsEnum; - } - return null; - } } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 21609e2926c61..9db7e3716d51a 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -23,9 +23,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.Term; -import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.util.CloseableThreadLocal; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.mapper.SeqNoFieldMapper; @@ -195,34 +193,4 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; } - - /** - * Checks for the existence of the history of a pair SeqNo/PrimaryTerm in Lucene. The checking pair is considered as existed - * if there is a pair such as the seqNo equals to the checking seqNo and the primary term is at least the checking term. - */ - public static boolean hasHistoryInLucene(IndexReader reader, Term idTerm, long seqNo, long primaryTerm) throws IOException { - final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, idTerm.field()); - final List leaves = reader.leaves(); - // iterate backwards to optimize for the frequently updated documents which are likely to be in the last segments - for (int i = leaves.size() - 1; i >= 0; i--) { - final LeafReaderContext leaf = leaves.get(i); - final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; - final PostingsEnum postingsEnum = lookup.getPostingsOrNull(idTerm.bytes()); - if (postingsEnum == null) { - continue; - } - final NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME); - assert seqNoDV != null : "SeqNoDV does not exist"; - final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); - assert primaryTermDV != null : "PrimaryTermDV does not exist"; - for (int docId = postingsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postingsEnum.nextDoc()) { - if (seqNoDV.advanceExact(docId) && primaryTermDV.advanceExact(docId)) { - if (seqNoDV.longValue() == seqNo && primaryTermDV.longValue() >= primaryTerm) { - return true; - } - } - } - } - return false; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 216c0c7eae7d8..93b5e1bc8f9e8 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -580,35 +580,23 @@ public GetResult get(Get get, BiFunction search enum OpVsLuceneDocStatus { /** the op is more recent than the one that last modified the doc found in lucene*/ OP_NEWER, - /** the op is stale but its history is existed in Lucene */ - OP_STALE_HISTORY_EXISTS, - /** the op is stale and its history is not found in Lucene */ - OP_STALE_HISTORY_NOT_FOUND, + /** the op is older or the same as the one that last modified the doc found in lucene*/ + OP_STALE_OR_EQUAL, /** no doc was found in lucene */ LUCENE_DOC_NOT_FOUND } - private OpVsLuceneDocStatus compareToLuceneHistory(final Operation op, final Searcher searcher) throws IOException { - if (VersionsAndSeqNoResolver.hasHistoryInLucene(searcher.reader(), op.uid(), op.seqNo(), op.primaryTerm())) { - return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - return OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; - } - } - private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException { assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found"; final OpVsLuceneDocStatus status; VersionValue versionValue = getVersionFromMap(op.uid().bytes()); assert incrementVersionLookup(); if (versionValue != null) { - if (op.seqNo() > versionValue.seqNo || - (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) { + if (op.seqNo() > versionValue.seqNo || + (op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) status = OpVsLuceneDocStatus.OP_NEWER; - } else if (op.seqNo() == versionValue.seqNo && op.primaryTerm() == versionValue.term) { - status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - status = OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND; + else { + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { // load from index @@ -625,10 +613,10 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) if (op.primaryTerm() > existingTerm) { status = OpVsLuceneDocStatus.OP_NEWER; } else { - status = compareToLuceneHistory(op, searcher); + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } else { - status = compareToLuceneHistory(op, searcher); + status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } } } @@ -852,7 +840,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){ // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -861,18 +848,15 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); - } - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); - } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); } else { - plan = IndexingStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version() - ); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version()); + } else { + plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + index.seqNo(), index.version()); + } } } return plan; @@ -1184,7 +1168,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // unlike the primary, replicas don't really care to about found status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return true for the found flag in favor of code simplicity - final OpVsLuceneDocStatus opVsLucene; + final DeletionStrategy plan; if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) { // the operation seq# is lower then the current local checkpoint and thus was already put into lucene // this can happen during recovery where older operations are sent from the translog that are already @@ -1193,21 +1177,15 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws // question may have been deleted in an out of order op that is not replayed. // See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery // See testRecoveryWithOutOfOrderDelete for an example of peer recovery - opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS; - } else { - opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); - } - - final DeletionStrategy plan; - - if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) { plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version()); - } else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); } else { - plan = DeletionStrategy.processNormally( - opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, - delete.seqNo(), delete.version()); + final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); + if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { + plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); + } else { + plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, + delete.seqNo(), delete.version()); + } } return plan; } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 020c8dd254b72..439e1fd89be23 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4605,9 +4605,7 @@ private void assertOperationHistoryInLucene(List operations) t engine.forceMerge(true); } } - List operationsInLucene = getOperationSeqNoInLucene(engine); - assertThat(operationsInLucene, hasSize(expectedSeqNos.size())); - assertThat(operationsInLucene, containsInAnyOrder(expectedSeqNos.toArray())); + assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray())); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bcb73fe8b32dc..57ebf90efcf12 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -39,12 +39,11 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; -import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -58,6 +57,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -93,7 +93,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -673,9 +675,9 @@ public static void assertOpsOnReplica( /** * Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene. */ - public static List getOperationSeqNoInLucene(Engine engine) throws IOException { + public static Set getOperationSeqNoInLucene(Engine engine) throws IOException { engine.refresh("test"); - final List seqNos = new ArrayList<>(); + final Set seqNos = new HashSet<>(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); List leaves = indexSearcher.getIndexReader().leaves();