Skip to content

Commit

Permalink
Ensure LuceneChangesSnapshot reads in leaf order (#31246)
Browse files Browse the repository at this point in the history
Today we re-initialize DV instances while we read docs
for the snapshot. This is caused by the fact that we sort the
docs by seqID which causes then to be our of order. This change
sorts documents temporarily by docID, fetches the metadata (not source)
into a in-memory datastructure and sorts it back.
This allows efficient reuse of DV instances.
  • Loading branch information
s1monw authored Jun 12, 2018
1 parent 9feff98 commit b4469f8
Showing 1 changed file with 88 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.index.VersionType;
Expand All @@ -47,6 +46,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

Expand All @@ -67,9 +67,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private int docIndex = 0;
private final int totalHits;
private ScoreDoc[] scoreDocs;

private final ParallelArray parallelArray;
private final Closeable onClose;
private final CombinedDocValues[] docValues; // Cache of DocValues

/**
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
Expand Down Expand Up @@ -97,15 +96,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.requiredFullRange = requiredFullRange;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(searchBatchSize);
final TopDocs topDocs = searchOperations(null);

this.totalHits = Math.toIntExact(topDocs.totalHits);
this.scoreDocs = topDocs.scoreDocs;
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
this.docValues = new CombinedDocValues[leaves.size()];
for (LeafReaderContext leaf : leaves) {
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
}
this.onClose = engineSearcher;
fillParallelArray(scoreDocs, parallelArray);
}

@Override
Expand All @@ -126,8 +123,8 @@ public int overriddenOperations() {
@Override
public Translog.Operation next() throws IOException {
Translog.Operation op = null;
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
op = readDocAsOp(docId);
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
op = readDocAsOp(idx);
if (op != null) {
break;
}
Expand Down Expand Up @@ -156,19 +153,58 @@ private void rangeCheck(Translog.Operation op) {
}
}

private int nextDocId() throws IOException {
private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
if (docIndex < scoreDocs.length) {
int docId = scoreDocs[docIndex].doc;
int idx = docIndex;
docIndex++;
return docId;
return idx;
}
return -1;
}

private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException {
if (scoreDocs.length > 0) {
for (int i = 0; i < scoreDocs.length; i++) {
scoreDocs[i].shardIndex = i;
}
// for better loading performance we sort the array by docID and
// then visit all leaves in order.
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
int docBase = -1;
int maxDoc = 0;
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
int readerIndex = 0;
CombinedDocValues combinedDocValues = null;
LeafReaderContext leaf = null;
for (int i = 0; i < scoreDocs.length; i++) {
ScoreDoc scoreDoc = scoreDocs[i];
if (scoreDoc.doc >= docBase + maxDoc) {
do {
leaf = leaves.get(readerIndex++);
docBase = leaf.docBase;
maxDoc = leaf.reader().maxDoc();
} while (scoreDoc.doc >= docBase + maxDoc);
combinedDocValues = new CombinedDocValues(leaf.reader());
}
final int segmentDocID = scoreDoc.doc - docBase;
final int index = scoreDoc.shardIndex;
parallelArray.leafReaderContexts[index] = leaf;
parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID);
parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID);
parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID);
parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID);
parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID);
}
// now sort back based on the shardIndex. we use this to store the previous index
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
}
return DocIdSetIterator.NO_MORE_DOCS;
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
Expand All @@ -180,31 +216,30 @@ private TopDocs searchOperations(ScoreDoc after) throws IOException {
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
}

private Translog.Operation readDocAsOp(int docID) throws IOException {
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
final int segmentDocID = docID - leaf.docBase;
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
private Translog.Operation readDocAsOp(int docIndex) throws IOException {
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
final long seqNo = parallelArray.seqNo[docIndex];
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
skippedOperations++;
return null;
}
final long version = docValues[leaf.ord].docVersion(segmentDocID);
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
final long version = parallelArray.version[docIndex];
final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
indexSearcher.doc(docID, fields);
leaf.reader().document(segmentDocID, fields);
fields.postProcess(mapperService);

final Translog.Operation op;
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
final boolean isTombstone = parallelArray.isTombStone[docIndex];
if (isTombstone && fields.uid() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString());
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
Expand Down Expand Up @@ -237,16 +272,32 @@ private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) th
return ndv.longValue() == 1;
}

private static final class ParallelArray {
final LeafReaderContext[] leafReaderContexts;
final long[] version;
final long[] seqNo;
final long[] primaryTerm;
final boolean[] isTombStone;
final boolean[] hasRecoverySource;

ParallelArray(int size) {
version = new long[size];
seqNo = new long[size];
primaryTerm = new long[size];
isTombStone = new boolean[size];
hasRecoverySource = new boolean[size];
leafReaderContexts = new LeafReaderContext[size];
}
}

private static final class CombinedDocValues {
private final LeafReader leafReader;
private NumericDocValues versionDV;
private NumericDocValues seqNoDV;
private NumericDocValues primaryTermDV;
private NumericDocValues tombstoneDV;
private NumericDocValues recoverySource;
private final NumericDocValues versionDV;
private final NumericDocValues seqNoDV;
private final NumericDocValues primaryTermDV;
private final NumericDocValues tombstoneDV;
private final NumericDocValues recoverySource;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.leafReader = leafReader;
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
this.primaryTermDV = Objects.requireNonNull(
Expand All @@ -256,19 +307,15 @@ private static final class CombinedDocValues {
}

long docVersion(int segmentDocId) throws IOException {
if (versionDV.docID() > segmentDocId) {
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
}
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
}

long docSeqNo(int segmentDocId) throws IOException {
if (seqNoDV.docID() > segmentDocId) {
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
}
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
Expand All @@ -279,9 +326,7 @@ long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
if (primaryTermDV.docID() > segmentDocId) {
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
}
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
Expand All @@ -293,19 +338,15 @@ boolean isTombstone(int segmentDocId) throws IOException {
if (tombstoneDV == null) {
return false;
}
if (tombstoneDV.docID() > segmentDocId) {
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
}
assert tombstoneDV.docID() < segmentDocId;
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}

boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
if (recoverySource.docID() > segmentDocId) {
recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
assert recoverySource.docID() < segmentDocId;
return recoverySource.advanceExact(segmentDocId);
}
}
Expand Down

0 comments on commit b4469f8

Please sign in to comment.