Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Read changes from Lucene instead of translog #30120

Merged
merged 24 commits into from
May 9, 2018
Merged
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use the changes snapshot
dnhatn committed May 5, 2018
commit f86dc1deb7fd8cbda089a871827ce48c9aabeba1
Original file line number Diff line number Diff line change
@@ -2346,20 +2346,15 @@ long getNumDocUpdates() {
return numDocUpdates.count();
}

/**
* Creates a new "translog" snapshot containing changes between <code>minSeqNo</code> and <code>maxSeqNo</code>
* from the Lucene index.
*/
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
long minSeqNo, long maxSeqNo) throws IOException {
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
ensureOpen();
if (lastRefreshedCheckpointListener.refreshedLocalCheckpoint.get() < maxSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
return new LuceneChangesSnapshot(() -> acquireSearcher(source, SearcherScope.INTERNAL),
mapperService, minSeqNo, maxSeqNo, true, () -> {
});
return new LuceneChangesSnapshot(() -> acquireSearcher(source, SearcherScope.INTERNAL), mapperService,
minSeqNo, maxSeqNo, requiredFullRange, () -> { });
}

@Override
Original file line number Diff line number Diff line change
@@ -28,21 +28,14 @@
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
@@ -67,7 +60,6 @@
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
@@ -116,7 +108,6 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;

public abstract class EngineTestCase extends ESTestCase {
@@ -707,76 +698,17 @@ public static void assertOpsOnReplica(
* The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation.
*/
public static List<Translog.Operation> readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException {
engine.refresh("test");
final List<Translog.Operation> operations = new ArrayList<>();
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
final IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
final Sort sortedBySeqNoThenByTerm = new Sort(
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
);
final TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
long lastSeenSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (ScoreDoc scoreDoc : allDocs.scoreDocs) {
final Translog.Operation op = readOperationInLucene(indexSearcher, mapper, scoreDoc.doc);
if (op.seqNo() != lastSeenSeqNo) {
operations.add(op);
lastSeenSeqNo = op.seqNo();
}
long maxSeqNo = Math.max(0, engine.getLocalCheckpointTracker().getMaxSeqNo());
try (Translog.Snapshot snapshot = engine.newLuceneChangesSnapshot("test", mapper, 0, maxSeqNo, false)) {
Translog.Operation op;
while ((op = snapshot.next()) != null){
operations.add(op);
}
}
return operations;
}

private static Translog.Operation readOperationInLucene(IndexSearcher searcher, MapperService mapper, int docID) throws IOException {
final List<LeafReaderContext> leaves = searcher.getIndexReader().leaves();
final int leafIndex = ReaderUtil.subIndex(docID, leaves);
final int segmentDocID = docID - leaves.get(leafIndex).docBase;
final long seqNo = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.NAME, segmentDocID);
final long primaryTerm = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID);
final FieldsVisitor fields = new FieldsVisitor(true);
searcher.doc(docID, fields);
fields.postProcess(mapper);
final Translog.Operation op;
final boolean isTombstone = isTombstoneOperation(leaves.get(leafIndex), segmentDocID);
if (isTombstone && fields.uid() == null) {
op = new Translog.NoOp(seqNo, primaryTerm, "");
assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
: "Noop operation but soft_deletes field is not set";
} else {
final String id = fields.uid().id();
final String type = fields.uid().type();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
final long version = readNumericDV(leaves.get(leafIndex), VersionFieldMapper.NAME, segmentDocID);
if (isTombstone) {
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1
: "Delete operation but soft_deletes field is not set";
} else {
final BytesReference source = fields.source();
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, source.toBytesRef().bytes,
fields.routing(), -1);
}
}
return op;
}

private static boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException {
final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) {
return tombstoneDV.longValue() == 1;
}
return false;
}

private static long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException {
final NumericDocValues dv = leaf.reader().getNumericDocValues(field);
if (dv == null || dv.advanceExact(segmentDocID) == false) {
throw new IllegalStateException("DocValues for field [" + field + "] is not found");
}
return dv.longValue();
}

/**
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/