Skip to content

Commit

Permalink
Return consistent source in updates (#48707)
Browse files Browse the repository at this point in the history
  • Loading branch information
ywelsch authored Oct 31, 2019
1 parent 3e4791a commit 4c75564
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 29 deletions.
19 changes: 11 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
if (docIdAndVersion != null) {
// don't release the searcher on this path, it is the
// responsibility of the caller to call GetResult.release
return new GetResult(searcher, docIdAndVersion);
return new GetResult(searcher, docIdAndVersion, false);
} else {
Releasables.close(searcher);
return GetResult.NOT_EXISTS;
Expand Down Expand Up @@ -1626,21 +1626,20 @@ public static class GetResult implements Releasable {
private final long version;
private final DocIdAndVersion docIdAndVersion;
private final Engine.Searcher searcher;
private final boolean fromTranslog;

public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null, false);

private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher) {
private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
this.exists = exists;
this.version = version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this.fromTranslog = fromTranslog;
}

/**
* Build a non-realtime get result from the searcher.
*/
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
}

public boolean exists() {
Expand All @@ -1651,6 +1650,10 @@ public long version() {
return this.version;
}

public boolean isFromTranslog() {
return fromTranslog;
}

public Engine.Searcher searcher() {
return this.searcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Engine.Searcher>
return new GetResult(new Engine.Searcher("realtime_get", reader,
IndexSearcher.getDefaultSimilarity(), null, IndexSearcher.getDefaultQueryCachingPolicy(), reader),
new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(),
reader, 0));
reader, 0), true);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private GetResult innerGet(String id, String[] gFields, boolean realtime, long v
Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
Engine.GetResult get = indexShard.get(new Engine.Get(realtime, readFromTranslog, id, uidTerm)
.version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
assert get.isFromTranslog() == false || readFromTranslog : "should only read from translog if explicitly enabled";
if (get.exists() == false) {
get.close();
}
Expand Down Expand Up @@ -223,12 +224,22 @@ private GetResult innerGetLoadFromStoredFields(String id, String[] gFields, Fetc

if (!fetchSourceContext.fetchSource()) {
source = null;
} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
}

if (source != null && get.isFromTranslog()) {
// reapply source filters from mapping (possibly also nulling the source)
try {
source = docMapper.sourceMapper().applyFilters(source, null);
} catch (IOException e) {
throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
}
}

if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
Map<String, Object> sourceAsMap;
XContentType sourceContentType = null;
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
sourceContentType = typeMapTuple.v1();
XContentType sourceContentType = typeMapTuple.v1();
sourceAsMap = typeMapTuple.v2();
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -227,33 +228,43 @@ public void parse(ParseContext context) throws IOException {
@Override
protected void parseCreateField(ParseContext context, List<IndexableField> fields) throws IOException {
BytesReference originalSource = context.sourceToParse().source();
BytesReference source = originalSource;
if (enabled && fieldType().stored() && source != null) {
XContentType contentType = context.sourceToParse().getXContentType();
final BytesReference adaptedSource = applyFilters(originalSource, contentType);

if (adaptedSource != null) {
final BytesRef ref = adaptedSource.toBytesRef();
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}

if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}

@Nullable
public BytesReference applyFilters(@Nullable BytesReference originalSource, @Nullable XContentType contentType) throws IOException {
if (enabled && fieldType().stored() && originalSource != null) {
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
if (filter != null) {
// we don't update the context source if we filter, we want to keep it as is...
Tuple<XContentType, Map<String, Object>> mapTuple =
XContentHelper.convertToMap(source, true, context.sourceToParse().getXContentType());
XContentHelper.convertToMap(originalSource, true, contentType);
Map<String, Object> filteredSource = filter.apply(mapTuple.v2());
BytesStreamOutput bStream = new BytesStreamOutput();
XContentType contentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(contentType, bStream).map(filteredSource);
XContentType actualContentType = mapTuple.v1();
XContentBuilder builder = XContentFactory.contentBuilder(actualContentType, bStream).map(filteredSource);
builder.close();
source = bStream.bytes();
return bStream.bytes();
} else {
return originalSource;
}
BytesRef ref = source.toBytesRef();
fields.add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
} else {
source = null;
return null;
}

if (originalSource != null && source != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
fields.add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));
fields.add(new NumericDocValuesField(RECOVERY_SOURCE_NAME, 1));
}
}
}

@Override
protected String contentType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ public void testGetForUpdate() throws IOException {
closeShards(primary);
}

public void testGetFromTranslogWithSourceMappingOptions() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.build();
String docToIndex = "{\"foo\" : \"foo\", \"bar\" : \"bar\"}";
boolean noSource = randomBoolean();
String sourceOptions = noSource ? "\"enabled\": false" : randomBoolean() ? "\"excludes\": [\"fo*\"]" : "\"includes\": [\"ba*\"]";
String expectedResult = noSource ? "" : "{\"bar\":\"bar\"}";
IndexMetaData metaData = IndexMetaData.builder("test")
.putMapping("test", "{ \"properties\": { \"foo\": { \"type\": \"text\"}, \"bar\": { \"type\": \"text\"}}, \"_source\": { "
+ sourceOptions + "}}}")
.settings(settings)
.primaryTerm(0, 1).build();
IndexShard primary = newShard(new ShardId(metaData.getIndex(), 0), true, "n1", metaData, null);
recoverShardFromStore(primary);
Engine.IndexResult test = indexDoc(primary, "test", "0", docToIndex);
assertTrue(primary.getEngine().refreshNeeded());
GetResult testGet = primary.getService().getForUpdate("0", UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM);
assertFalse(testGet.getFields().containsKey(RoutingFieldMapper.NAME));
assertEquals(new String(testGet.source() == null ? new byte[0] : testGet.source(), StandardCharsets.UTF_8), expectedResult);
try (Engine.Searcher searcher = primary.getEngine().acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(searcher.getIndexReader().maxDoc(), 1); // we refreshed
}

closeShards(primary);
}

public void testTypelessGetForUpdate() throws IOException {
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
Expand Down

0 comments on commit 4c75564

Please sign in to comment.