Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use _refresh instead of reading from Translog in the RT GET case #20102

Merged
merged 10 commits into from
Aug 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.engine;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;

/** Holds a deleted version, which just adds a timestamp to {@link VersionValue} so we know when we can expire the deletion. */

Expand All @@ -30,8 +29,8 @@ class DeleteVersionValue extends VersionValue {

private final long time;

public DeleteVersionValue(long version, long time, Translog.Location translogLocation) {
super(version, translogLocation);
public DeleteVersionValue(long version, long time) {
super(version);
this.time = time;
}

Expand All @@ -47,7 +46,6 @@ public boolean delete() {

@Override
public long ramBytesUsed() {
Translog.Location translogLocation = translogLocation();
return BASE_RAM_BYTES_USED + (translogLocation != null ? translogLocation.ramBytesUsed() : 0);
return BASE_RAM_BYTES_USED;
}
}
28 changes: 6 additions & 22 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -1000,32 +1000,23 @@ public Get versionType(VersionType versionType) {
public static class GetResult implements Releasable {
private final boolean exists;
private final long version;
private final Translog.Source source;
private final Versions.DocIdAndVersion docIdAndVersion;
private final Searcher searcher;

public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null);
public static final GetResult NOT_EXISTS = new GetResult(false, Versions.NOT_FOUND, null, null);

/**
* Build a realtime get result from the translog.
*/
public GetResult(boolean exists, long version, @Nullable Translog.Source source) {
this.source = source;
private GetResult(boolean exists, long version, Versions.DocIdAndVersion docIdAndVersion, Searcher searcher) {
this.exists = exists;
this.version = version;
this.docIdAndVersion = null;
this.searcher = null;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
}

/**
* Build a non-realtime get result from the searcher.
*/
public GetResult(Searcher searcher, Versions.DocIdAndVersion docIdAndVersion) {
this.exists = true;
this.source = null;
this.version = docIdAndVersion.version;
this.docIdAndVersion = docIdAndVersion;
this.searcher = searcher;
this(true, docIdAndVersion.version, docIdAndVersion, searcher);
}

public boolean exists() {
Expand All @@ -1036,11 +1027,6 @@ public long version() {
return this.version;
}

@Nullable
public Translog.Source source() {
return source;
}

public Searcher searcher() {
return this.searcher;
}
Expand All @@ -1055,9 +1041,7 @@ public void close() {
}

public void release() {
if (searcher != null) {
searcher.close();
}
Releasables.close(searcher);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,7 @@ public GetResult get(Get get, Function<String, Searcher> searcherFactory) throws
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
get.versionType().explainConflictForReads(versionValue.version(), get.version()));
}
Translog.Operation op = translog.read(versionValue.translogLocation());
if (op != null) {
return new GetResult(true, versionValue.version(), op.getSource());
}
refresh("realtime_get");
}
}

Expand Down Expand Up @@ -368,11 +365,11 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
return currentVersion;
}

private static VersionValueSupplier NEW_VERSION_VALUE = (u, t, l) -> new VersionValue(u, l);
private static VersionValueSupplier NEW_VERSION_VALUE = (u, t) -> new VersionValue(u);

@FunctionalInterface
private interface VersionValueSupplier {
VersionValue apply(long updatedVersion, long time, Translog.Location location);
VersionValue apply(long updatedVersion, long time);
}

private <T extends Engine.Operation> void maybeAddToTranslog(
Expand All @@ -383,14 +380,9 @@ private <T extends Engine.Operation> void maybeAddToTranslog(
if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op));
op.setTranslogLocation(translogLocation);
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), op.getTranslogLocation()));
} else {
// we do not replay in to the translog, so there is no
// translog location; that is okay because real-time
// gets are not possible during recovery and we will
// flush when the recovery is complete
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null));
}
versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis()));

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private static class Maps {

// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
final Map<BytesRef,VersionValue> old;

public Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
this.current = current;
this.old = old;
Expand Down Expand Up @@ -256,7 +256,7 @@ public long ramBytesUsed() {
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
}

/** Returns how much RAM would be freed up by refreshing. This is {@link ramBytesUsed} except does not include tombstones because they
/** Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
* don't clear on refresh. */
long ramBytesUsedForRefresh() {
return ramBytesUsedCurrent.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.index.translog.Translog;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -31,11 +30,9 @@ class VersionValue implements Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(VersionValue.class);

private final long version;
private final Translog.Location translogLocation;

public VersionValue(long version, Translog.Location translogLocation) {
public VersionValue(long version) {
this.version = version;
this.translogLocation = translogLocation;
}

public long time() {
Expand All @@ -50,13 +47,10 @@ public boolean delete() {
return false;
}

public Translog.Location translogLocation() {
return this.translogLocation;
}

@Override
public long ramBytesUsed() {
return BASE_RAM_BYTES_USED + (translogLocation != null ? translogLocation.ramBytesUsed() : 0);
return BASE_RAM_BYTES_USED;
}

@Override
Expand Down
131 changes: 1 addition & 130 deletions core/src/main/java/org/elasticsearch/index/get/ShardGetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public GetStats stats() {
return new GetStats(existsMetric.count(), TimeUnit.NANOSECONDS.toMillis(existsMetric.sum()), missingMetric.count(), TimeUnit.NANOSECONDS.toMillis(missingMetric.sum()), currentMetric.count());
}


public GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType, FetchSourceContext fetchSourceContext, boolean ignoreErrorsOnGeneratedFields) {
currentMetric.inc();
try {
Expand Down Expand Up @@ -182,140 +181,12 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea

try {
// break between having loaded it from translog (so we only have _source), and having a document to load
if (get.docIdAndVersion() != null) {
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} else {
Translog.Source source = get.source();

Map<String, GetField> fields = null;
SearchLookup searchLookup = null;

// we can only load scripts that can run against the source
Set<String> neededFields = new HashSet<>();
// add meta fields
neededFields.add(RoutingFieldMapper.NAME);
DocumentMapper docMapper = mapperService.documentMapper(type);
if (docMapper.parentFieldMapper().active()) {
neededFields.add(ParentFieldMapper.NAME);
}
if (docMapper.timestampFieldMapper().enabled()) {
neededFields.add(TimestampFieldMapper.NAME);
}
if (docMapper.TTLFieldMapper().enabled()) {
neededFields.add(TTLFieldMapper.NAME);
}
// add requested fields
if (gFields != null) {
neededFields.addAll(Arrays.asList(gFields));
}
for (String field : neededFields) {
if (SourceFieldMapper.NAME.equals(field)) {
// dealt with when normalizing fetchSourceContext.
continue;
}
Object value = null;
if (field.equals(RoutingFieldMapper.NAME)) {
value = source.routing;
} else if (field.equals(ParentFieldMapper.NAME) && docMapper.parentFieldMapper().active()) {
value = source.parent;
} else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().enabled()) {
value = source.timestamp;
} else if (field.equals(TTLFieldMapper.NAME) && docMapper.TTLFieldMapper().enabled()) {
// Call value for search with timestamp + ttl here to display the live remaining ttl value and be consistent with the search result display
if (source.ttl > 0) {
value = docMapper.TTLFieldMapper().valueForSearch(source.timestamp + source.ttl);
}
} else {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, null, new String[]{type});
searchLookup.source().setSource(source.source);
}

FieldMapper fieldMapper = docMapper.mappers().smartNameFieldMapper(field);
if (fieldMapper == null) {
if (docMapper.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
} else if (shouldGetFromSource(ignoreErrorsOnGeneratedFields, docMapper, fieldMapper)) {
List<Object> values = searchLookup.source().extractRawValues(field);
if (!values.isEmpty()) {
value = values;
}

}
}
if (value != null) {
if (fields == null) {
fields = new HashMap<>(2);
}
if (value instanceof List) {
fields.put(field, new GetField(field, (List) value));
} else {
fields.put(field, new GetField(field, Collections.singletonList(value)));
}
}
}

// deal with source, but only if it's enabled (we always have it from the translog)
BytesReference sourceToBeReturned = null;
SourceFieldMapper sourceFieldMapper = docMapper.sourceMapper();
if (fetchSourceContext.fetchSource() && sourceFieldMapper.enabled()) {

sourceToBeReturned = source.source;

// Cater for source excludes/includes at the cost of performance
// We must first apply the field mapper filtering to make sure we get correct results
// in the case that the fetchSourceContext white lists something that's not included by the field mapper

boolean sourceFieldFiltering = sourceFieldMapper.includes().length > 0 || sourceFieldMapper.excludes().length > 0;
boolean sourceFetchFiltering = fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0;
if (sourceFieldFiltering || sourceFetchFiltering) {
// TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source.source, true);
XContentType sourceContentType = typeMapTuple.v1();
Map<String, Object> sourceAsMap = typeMapTuple.v2();
if (sourceFieldFiltering) {
sourceAsMap = XContentMapValues.filter(sourceAsMap, sourceFieldMapper.includes(), sourceFieldMapper.excludes());
}
if (sourceFetchFiltering) {
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
}
try {
sourceToBeReturned = XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap).bytes();
} catch (IOException e) {
throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
}
}
}

return new GetResult(shardId.getIndexName(), type, id, get.version(), get.exists(), sourceToBeReturned, fields);
}
return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.release();
}
}

protected boolean shouldGetFromSource(boolean ignoreErrorsOnGeneratedFields, DocumentMapper docMapper, FieldMapper fieldMapper) {
if (!fieldMapper.isGenerated()) {
//if the field is always there we check if either source mapper is enabled, in which case we get the field
// from source, or, if the field is stored, in which case we have to get if from source here also (we are in the translog phase, doc not indexed yet, we annot access the stored fields)
return docMapper.sourceMapper().enabled() || fieldMapper.fieldType().stored();
} else {
if (!fieldMapper.fieldType().stored()) {
//if it is not stored, user will not get the generated field back
return false;
} else {
if (ignoreErrorsOnGeneratedFields) {
return false;
} else {
throw new ElasticsearchException("Cannot access field " + fieldMapper.name() + " from transaction log. You can only get this field after refresh() has been called.");
}
}

}
}

private GetResult innerGetLoadFromStoredFields(String type, String id, String[] gFields, FetchSourceContext fetchSourceContext, Engine.GetResult get, MapperService mapperService) {
Map<String, GetField> fields = null;
BytesReference source = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,9 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ
Engine.GetResult get = indexShard.get(new Engine.Get(request.realtime(), uidTerm).version(request.version()).versionType(request.versionType()));

Fields termVectorsByField = null;
boolean docFromTranslog = get.source() != null;
AggregatedDfs dfs = null;
TermVectorsFilter termVectorsFilter = null;

/* fetched from translog is treated as an artificial document */
if (docFromTranslog) {
request.doc(get.source().source, false);
termVectorsResponse.setDocVersion(get.version());
}

/* handle potential wildcards in fields */
if (request.selectedFields() != null) {
handleFieldWildcards(indexShard, request);
Expand All @@ -103,12 +96,12 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ
Versions.DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
/* from an artificial document */
if (request.doc() != null) {
termVectorsByField = generateTermVectorsFromDoc(indexShard, request, !docFromTranslog);
termVectorsByField = generateTermVectorsFromDoc(indexShard, request);
// if no document indexed in shard, take the queried document itself for stats
if (topLevelFields == null) {
topLevelFields = termVectorsByField;
}
termVectorsResponse.setArtificial(!docFromTranslog);
termVectorsResponse.setArtificial(true);
termVectorsResponse.setExists(true);
}
/* or from an existing document */
Expand Down Expand Up @@ -252,7 +245,7 @@ private static Fields generateTermVectors(IndexShard indexShard, Collection<GetF
return MultiFields.getFields(index.createSearcher().getIndexReader());
}

private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVectorsRequest request, boolean doAllFields) throws IOException {
private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVectorsRequest request) throws IOException {
// parse the document, at the moment we do update the mapping, just like percolate
ParsedDocument parsedDocument = parseDocument(indexShard, indexShard.shardId().getIndexName(), request.type(), request.doc());

Expand All @@ -265,9 +258,6 @@ private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVect
if (!isValidField(fieldType)) {
continue;
}
if (request.selectedFields() == null && !doAllFields && !fieldType.storeTermVectors()) {
continue;
}
if (request.selectedFields() != null && !request.selectedFields().contains(field.name())) {
continue;
}
Expand Down
Loading