Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a _recovery_source if source is omitted or modified #31106

Merged
merged 11 commits into from
Jun 7, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -2013,7 +2014,8 @@ private IndexWriterConfig getIndexWriterConfig() {
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
mergePolicy = new RecoverySourcePruneMergePolicy(SourceFieldMapper.RECOVERY_SOURCE_NAME, this::recoverySourcePruneQuery,
new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy));
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand All @@ -2040,6 +2042,13 @@ private Query softDeletesRetentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}

private Query recoverySourcePruneQuery() {
ensureOpen();
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Long.MIN_VALUE, persistedGlobalCheckpoint - retainedExtraOps);
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.translog.Translog;
Expand Down Expand Up @@ -196,7 +197,9 @@ private Translog.Operation readDocAsOp(int docID) throws IOException {
return null;
}
final long version = docValues[leaf.ord].docVersion(segmentDocID);
final FieldsVisitor fields = new FieldsVisitor(true);
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
indexSearcher.doc(docID, fields);
fields.postProcess(mapperService);

Expand Down Expand Up @@ -240,6 +243,7 @@ private static final class CombinedDocValues {
private NumericDocValues seqNoDV;
private NumericDocValues primaryTermDV;
private NumericDocValues tombstoneDV;
private NumericDocValues recoverySource;

CombinedDocValues(LeafReader leafReader) throws IOException {
this.leafReader = leafReader;
Expand All @@ -248,6 +252,7 @@ private static final class CombinedDocValues {
this.primaryTermDV = Objects.requireNonNull(
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
this.recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}

long docVersion(int segmentDocId) throws IOException {
Expand Down Expand Up @@ -293,5 +298,15 @@ boolean isTombstone(int segmentDocId) throws IOException {
}
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
}

boolean hasRecoverySource(int segmentDocId) throws IOException {
if (recoverySource == null) {
return false;
}
if (tombstoneDV.docID() > segmentDocId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tombstoneDV -> recoverySource

recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
}
return tombstoneDV.advanceExact(segmentDocId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here tombstoneDV -> recoverySource

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.FilterNumericDocValues;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.OneMergeWrappingMergePolicy;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;

import java.io.IOException;
import java.util.function.Supplier;

final class RecoverySourcePruneMergePolicy extends OneMergeWrappingMergePolicy {
RecoverySourcePruneMergePolicy(String recoverySourceField, Supplier<Query> retentionPolicySupplier, MergePolicy in) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retentionPolicySupplier is confusing. It's a prune query supplier.

super(in, toWrap -> new OneMerge(toWrap.segments) {
@Override
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
CodecReader wrapped = toWrap.wrapForMerge(reader);
NumericDocValues recovery_source = wrapped.getNumericDocValues(recoverySourceField);
if (recovery_source == null || recovery_source.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
return wrapped;
}
return wrapReader(recoverySourceField, wrapped, retentionPolicySupplier);
}
});

}

// pkg private for testing
static CodecReader wrapReader(String recoverySourceField, CodecReader reader, Supplier<Query> retentionPolicySupplier)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: retentionPolicySupplier

throws IOException {
BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(new DocValuesFieldExistsQuery(recoverySourceField), BooleanClause.Occur.FILTER);
builder.add(retentionPolicySupplier.get(), BooleanClause.Occur.FILTER);
IndexSearcher s = new IndexSearcher(reader);
s.setQueryCache(null);
Weight weight = s.createWeight(builder.build(), false, 1.0f);
Scorer scorer = weight.scorer(reader.getContext());

if (scorer != null) {
BitSet sourceToDrop = BitSet.of(scorer.iterator(), reader.maxDoc());
return new SourcePruningFilterCodecReader(recoverySourceField, reader, sourceToDrop);
} else {
return reader;
}
}

private static class SourcePruningFilterCodecReader extends FilterCodecReader {
private final BitSet sourceToDrop;
private final String recoverySourceField;

SourcePruningFilterCodecReader(String recoverySourceField, CodecReader reader, BitSet sourceToDrop) {
super(reader);
this.recoverySourceField = recoverySourceField;
this.sourceToDrop = sourceToDrop;
}

@Override
public DocValuesProducer getDocValuesReader() {
DocValuesProducer docValuesReader = super.getDocValuesReader();
return new FilterDocValuesProducer(docValuesReader) {
@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
NumericDocValues numeric = super.getNumeric(field);
if (recoverySourceField.equals(field.name)) {
return new FilterNumericDocValues(numeric) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe leave comments about why we don't need to check whether numeric and docValuesReader are null

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If recoverySourceToKeep was a bitset, we could do a leap frog, which would be faster if recoverySourceToKeep is sparse.

final ConjunctionDISI intersection = ConjunctionDISI.intersectIterators(Arrays.asList(numeric, new BitSetIterator(recoverySourceToKeep)));
return new FilterNumericDocValues(numeric) {
  @Override
  public int nextDoc() throws IOException {
    return intersection.nextDoc();
  }
};

@Override
public int nextDoc() throws IOException {
int doc;
do {
doc = super.nextDoc();
} while (doc != NO_MORE_DOCS && sourceToDrop.get(doc));
return doc;
}

@Override
public int advance(int target) {
throw new UnsupportedOperationException();
}

@Override
public boolean advanceExact(int target) {
throw new UnsupportedOperationException();
}
};
}
return numeric;
}
};
}

@Override
public StoredFieldsReader getFieldsReader() {
StoredFieldsReader fieldsReader = super.getFieldsReader();
return new FilterStoredFieldsReader(fieldsReader) {
@Override
public void visitDocument(int docID, StoredFieldVisitor visitor) throws IOException {
if (sourceToDrop.get(docID)) {
visitor = new FilterStoredFieldVisitor(visitor) {
@Override
public Status needsField(FieldInfo fieldInfo) throws IOException {
if (recoverySourceField.equals(fieldInfo.name)) {
return Status.NO;
}
return super.needsField(fieldInfo);
}
};
}
super.visitDocument(docID, visitor);
}
};
}

@Override
public CacheHelper getCoreCacheHelper() {
return null;
}

@Override
public CacheHelper getReaderCacheHelper() {
return null;
}

private static class FilterDocValuesProducer extends DocValuesProducer {
private final DocValuesProducer in;

FilterDocValuesProducer(DocValuesProducer in) {
this.in = in;
}

@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
return in.getNumeric(field);
}

@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
return in.getBinary(field);
}

@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
return in.getSorted(field);
}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
return in.getSortedNumeric(field);
}

@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
return in.getSortedSet(field);
}

@Override
public void checkIntegrity() throws IOException {
in.checkIntegrity();
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public long ramBytesUsed() {
return in.ramBytesUsed();
}
}

private static class FilterStoredFieldsReader extends StoredFieldsReader {

private final StoredFieldsReader fieldsReader;

FilterStoredFieldsReader(StoredFieldsReader fieldsReader) {
this.fieldsReader = fieldsReader;
}

@Override
public long ramBytesUsed() {
return fieldsReader.ramBytesUsed();
}

@Override
public void close() throws IOException {
fieldsReader.close();
}

@Override
public void visitDocument(int docID, StoredFieldVisitor visitor) throws IOException {
fieldsReader.visitDocument(docID, visitor);
}

@Override
public StoredFieldsReader clone() {
return fieldsReader.clone();
}

@Override
public void checkIntegrity() throws IOException {
fieldsReader.checkIntegrity();
}
}

private static class FilterStoredFieldVisitor extends StoredFieldVisitor {
private final StoredFieldVisitor visitor;

FilterStoredFieldVisitor(StoredFieldVisitor visitor) {
this.visitor = visitor;
}

@Override
public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
visitor.binaryField(fieldInfo, value);
}

@Override
public void stringField(FieldInfo fieldInfo, byte[] value) throws IOException {
visitor.stringField(fieldInfo, value);
}

@Override
public void intField(FieldInfo fieldInfo, int value) throws IOException {
visitor.intField(fieldInfo, value);
}

@Override
public void longField(FieldInfo fieldInfo, long value) throws IOException {
visitor.longField(fieldInfo, value);
}

@Override
public void floatField(FieldInfo fieldInfo, float value) throws IOException {
visitor.floatField(fieldInfo, value);
}

@Override
public void doubleField(FieldInfo fieldInfo, double value) throws IOException {
visitor.doubleField(fieldInfo, value);
}

@Override
public Status needsField(FieldInfo fieldInfo) throws IOException {
return visitor.needsField(fieldInfo);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ public class FieldsVisitor extends StoredFieldVisitor {
RoutingFieldMapper.NAME));

private final boolean loadSource;
private final String sourceFieldName;
private final Set<String> requiredFields;
protected BytesReference source;
protected String type, id;
protected Map<String, List<Object>> fieldsValues;

public FieldsVisitor(boolean loadSource) {
this(loadSource, SourceFieldMapper.NAME);
}

public FieldsVisitor(boolean loadSource, String sourceFieldName) {
this.loadSource = loadSource;
this.sourceFieldName = sourceFieldName;
requiredFields = new HashSet<>();
reset();
}
Expand Down Expand Up @@ -103,7 +109,7 @@ public void postProcess(MapperService mapperService) {

@Override
public void binaryField(FieldInfo fieldInfo, byte[] value) throws IOException {
if (SourceFieldMapper.NAME.equals(fieldInfo.name)) {
if (sourceFieldName.equals(fieldInfo.name)) {
source = new BytesArray(value);
} else if (IdFieldMapper.NAME.equals(fieldInfo.name)) {
id = Uid.decodeId(value);
Expand Down Expand Up @@ -175,7 +181,7 @@ public void reset() {

requiredFields.addAll(BASE_REQUIRED_FIELDS);
if (loadSource) {
requiredFields.add(SourceFieldMapper.NAME);
requiredFields.add(sourceFieldName);
}
}

Expand Down
Loading