Skip to content

Commit

Permalink
Accept duplicate stale docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Apr 25, 2018
1 parent 5f6e851 commit 70d5359
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
int docID = DocIdSetIterator.NO_MORE_DOCS;
final PostingsEnum docsEnum = getPostingsOrNull(id);
if (docsEnum != null) {
if (termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}
return docID;
} else {
return DocIdSetIterator.NO_MORE_DOCS;
}
return docID;
}

/** Return null if id is not found. */
Expand All @@ -143,15 +145,4 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep
return null;
}
}

/**
* Returns an internal posting list of the given uid
*/
PostingsEnum getPostingsOrNull(BytesRef id) throws IOException {
if (termsEnum.seekExact(id)) {
docsEnum = termsEnum.postings(docsEnum, 0);
return docsEnum;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -195,34 +193,4 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}

/**
* Checks for the existence of the history of a pair SeqNo/PrimaryTerm in Lucene. The checking pair is considered as existed
* if there is a pair such as the seqNo equals to the checking seqNo and the primary term is at least the checking term.
*/
public static boolean hasHistoryInLucene(IndexReader reader, Term idTerm, long seqNo, long primaryTerm) throws IOException {
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, idTerm.field());
final List<LeafReaderContext> leaves = reader.leaves();
// iterate backwards to optimize for the frequently updated documents which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
final PostingsEnum postingsEnum = lookup.getPostingsOrNull(idTerm.bytes());
if (postingsEnum == null) {
continue;
}
final NumericDocValues seqNoDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
assert seqNoDV != null : "SeqNoDV does not exist";
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
assert primaryTermDV != null : "PrimaryTermDV does not exist";
for (int docId = postingsEnum.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = postingsEnum.nextDoc()) {
if (seqNoDV.advanceExact(docId) && primaryTermDV.advanceExact(docId)) {
if (seqNoDV.longValue() == seqNo && primaryTermDV.longValue() >= primaryTerm) {
return true;
}
}
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -580,35 +580,23 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
enum OpVsLuceneDocStatus {
/** the op is more recent than the one that last modified the doc found in lucene*/
OP_NEWER,
/** the op is stale but its history is existed in Lucene */
OP_STALE_HISTORY_EXISTS,
/** the op is stale and its history is not found in Lucene */
OP_STALE_HISTORY_NOT_FOUND,
/** the op is older or the same as the one that last modified the doc found in lucene*/
OP_STALE_OR_EQUAL,
/** no doc was found in lucene */
LUCENE_DOC_NOT_FOUND
}

private OpVsLuceneDocStatus compareToLuceneHistory(final Operation op, final Searcher searcher) throws IOException {
if (VersionsAndSeqNoResolver.hasHistoryInLucene(searcher.reader(), op.uid(), op.seqNo(), op.primaryTerm())) {
return OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS;
} else {
return OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND;
}
}

private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
final OpVsLuceneDocStatus status;
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
assert incrementVersionLookup();
if (versionValue != null) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term)) {
if (op.seqNo() > versionValue.seqNo ||
(op.seqNo() == versionValue.seqNo && op.primaryTerm() > versionValue.term))
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == versionValue.seqNo && op.primaryTerm() == versionValue.term) {
status = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS;
} else {
status = OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND;
else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
// load from index
Expand All @@ -625,10 +613,10 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
if (op.primaryTerm() > existingTerm) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else {
status = compareToLuceneHistory(op, searcher);
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
} else {
status = compareToLuceneHistory(op, searcher);
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
}
}
}
Expand Down Expand Up @@ -852,7 +840,6 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
if (index.seqNo() <= localCheckpointTracker.getCheckpoint()){
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
Expand All @@ -861,18 +848,15 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
}
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) {
plan = IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
} else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.seqNo(), index.version()
);
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.seqNo(), index.version());
}
}
}
return plan;
Expand Down Expand Up @@ -1184,7 +1168,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
// unlike the primary, replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
final OpVsLuceneDocStatus opVsLucene;
final DeletionStrategy plan;
if (delete.seqNo() <= localCheckpointTracker.getCheckpoint()) {
// the operation seq# is lower then the current local checkpoint and thus was already put into lucene
// this can happen during recovery where older operations are sent from the translog that are already
Expand All @@ -1193,21 +1177,15 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
// question may have been deleted in an out of order op that is not replayed.
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
opVsLucene = OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS;
} else {
opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
}

final DeletionStrategy plan;

if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_EXISTS) {
plan = DeletionStrategy.processButSkipLucene(false, delete.seqNo(), delete.version());
} else if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_HISTORY_NOT_FOUND) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(
opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
}
}
return plan;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4605,9 +4605,7 @@ private void assertOperationHistoryInLucene(List<Engine.Operation> operations) t
engine.forceMerge(true);
}
}
List<Long> operationsInLucene = getOperationSeqNoInLucene(engine);
assertThat(operationsInLucene, hasSize(expectedSeqNos.size()));
assertThat(operationsInLucene, containsInAnyOrder(expectedSeqNos.toArray()));
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -58,6 +57,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -93,7 +93,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -673,9 +675,9 @@ public static void assertOpsOnReplica(
/**
* Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene.
*/
public static List<Long> getOperationSeqNoInLucene(Engine engine) throws IOException {
public static Set<Long> getOperationSeqNoInLucene(Engine engine) throws IOException {
engine.refresh("test");
final List<Long> seqNos = new ArrayList<>();
final Set<Long> seqNos = new HashSet<>();
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
Expand Down

0 comments on commit 70d5359

Please sign in to comment.