Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group field caps response by index mapping hash #83494

Merged
merged 9 commits into from
Feb 17, 2022
7 changes: 7 additions & 0 deletions docs/changelog/83494.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 83494
summary: Group field caps response by index mapping hash
area: Search
type: enhancement
issues:
- 78665
- 82879
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -37,6 +38,7 @@
*/
class FieldCapabilitiesFetcher {
private final IndicesService indicesService;
private final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = new HashMap<>();

FieldCapabilitiesFetcher(IndicesService indicesService) {
this.indicesService = indicesService;
Expand Down Expand Up @@ -65,17 +67,34 @@ FieldCapabilitiesIndexResponse fetch(
);

if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false);
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), null, Collections.emptyMap(), false);
}

Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
final MappingMetadata mapping = indexService.getMetadata().mapping();
final String indexMappingHash = mapping != null ? mapping.getSha256() : null;
if (indexMappingHash != null) {
final Map<String, IndexFieldCapabilities> existing = indexMappingHashToResponses.get(indexMappingHash);
if (existing != null) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, existing, true);
}
}

return retrieveFieldCaps(shardId.getIndexName(), searchExecutionContext, fieldPatterns, filters, fieldTypes, fieldPredicate);
Predicate<String> fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName());
final Map<String, IndexFieldCapabilities> responseMap = retrieveFieldCaps(
searchExecutionContext,
fieldPatterns,
filters,
fieldTypes,
fieldPredicate
);
if (indexMappingHash != null) {
indexMappingHashToResponses.put(indexMappingHash, responseMap);
}
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, responseMap, true);
}
}

public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
String indexName,
static Map<String, IndexFieldCapabilities> retrieveFieldCaps(
SearchExecutionContext context,
String[] fieldPatterns,
String[] filters,
Expand Down Expand Up @@ -141,7 +160,7 @@ public static FieldCapabilitiesIndexResponse retrieveFieldCaps(
}
}
}
return new FieldCapabilitiesIndexResponse(indexName, responseMap, true);
return responseMap;
}

private static boolean checkIncludeParents(String[] filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,113 @@
package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class FieldCapabilitiesIndexResponse implements Writeable {
private static final Version MAPPING_HASH_VERSION = Version.V_8_2_0;

public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
@Nullable
private final String indexMappingHash;
private final Map<String, IndexFieldCapabilities> responseMap;
private final boolean canMatch;
private final transient Version originVersion;

FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
FieldCapabilitiesIndexResponse(
String indexName,
@Nullable String indexMappingHash,
Map<String, IndexFieldCapabilities> responseMap,
boolean canMatch
) {
this.indexName = indexName;
this.indexMappingHash = indexMappingHash;
this.responseMap = responseMap;
this.canMatch = canMatch;
this.originVersion = Version.CURRENT;
}

FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.canMatch = in.readBoolean();
this.originVersion = in.getVersion();
if (in.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
this.indexMappingHash = in.readOptionalString();
} else {
this.indexMappingHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
out.writeOptionalString(indexMappingHash);
}
}

private record GroupByMappingHash(List<String> indices, String indexMappingHash, Map<String, IndexFieldCapabilities> responseMap)
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
implements
Writeable {
GroupByMappingHash(StreamInput in) throws IOException {
this(in.readStringList(), in.readString(), in.readMap(StreamInput::readString, IndexFieldCapabilities::new));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indices);
out.writeString(indexMappingHash);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
}

List<FieldCapabilitiesIndexResponse> getResponses() {
return indices.stream().map(index -> new FieldCapabilitiesIndexResponse(index, indexMappingHash, responseMap, true)).toList();
}
}

static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
if (input.getVersion().before(MAPPING_HASH_VERSION)) {
return input.readList(FieldCapabilitiesIndexResponse::new);
}
final List<FieldCapabilitiesIndexResponse> ungroupedList = input.readList(FieldCapabilitiesIndexResponse::new);
final List<GroupByMappingHash> groups = input.readList(GroupByMappingHash::new);
return Stream.concat(ungroupedList.stream(), groups.stream().flatMap(g -> g.getResponses().stream())).toList();
}

static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
if (output.getVersion().before(MAPPING_HASH_VERSION)) {
output.writeCollection(responses);
return;
}
final Predicate<FieldCapabilitiesIndexResponse> canGroup = r -> r.canMatch && r.indexMappingHash != null;
final List<FieldCapabilitiesIndexResponse> ungroupedResponses = responses.stream().filter(r -> canGroup.test(r) == false).toList();
final List<GroupByMappingHash> groupedResponses = responses.stream()
.filter(canGroup)
.collect(Collectors.groupingBy(r -> r.indexMappingHash))
.values()
.stream()
.map(rs -> {
final String indexMappingHash = rs.get(0).indexMappingHash;
final Map<String, IndexFieldCapabilities> responseMap = rs.get(0).responseMap;
final List<String> indices = rs.stream().map(r -> r.indexName).toList();
return new GroupByMappingHash(indices, indexMappingHash, responseMap);
})
.toList();
output.writeList(ungroupedResponses);
output.writeList(groupedResponses);
}

/**
Expand All @@ -46,6 +125,14 @@ public String getIndexName() {
return indexName;
}

/**
* Returns the index mapping hash associated with this index if exists
*/
@Nullable
public String getIndexMappingHash() {
return indexMappingHash;
}

public boolean canMatch() {
return canMatch;
}
Expand All @@ -69,23 +156,19 @@ Version getOriginVersion() {
return originVersion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
return canMatch == that.canMatch
&& Objects.equals(indexName, that.indexName)
&& Objects.equals(indexMappingHash, that.indexMappingHash)
&& Objects.equals(responseMap, that.responseMap);
}

@Override
public int hashCode() {
return Objects.hash(indexName, responseMap, canMatch);
return Objects.hash(indexName, indexMappingHash, responseMap, canMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable

FieldCapabilitiesNodeResponse(StreamInput in) throws IOException {
super(in);
this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readMap(ShardId::new, StreamInput::readException);
this.unmatchedShardIds = in.readSet(ShardId::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
out.writeCollection(unmatchedShardIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readList(FieldCapabilitiesFailure::new);
}

Expand Down Expand Up @@ -141,7 +141,7 @@ private static Map<String, FieldCapabilities> readField(StreamInput in) throws I
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeList(failures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand All @@ -55,8 +56,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;

private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher;
private final Predicate<String> metadataFieldPred;
private final IndicesService indicesService;
private final boolean ccsCheckCompatibility;

@Inject
Expand All @@ -73,7 +74,7 @@ public TransportFieldCapabilitiesAction(
this.transportService = transportService;
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService);
this.indicesService = indicesService;
final Set<String> metadataFields = indicesService.getAllMetadataFields();
this.metadataFieldPred = metadataFields::contains;
transportService.registerRequestHandler(
Expand Down Expand Up @@ -112,6 +113,17 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
checkIndexBlocks(clusterState, concreteIndices);

final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
// This map is used to share the index response for indices which have the same index mapping hash to reduce the memory usage.
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
jtibshirani marked this conversation as resolved.
Show resolved Hide resolved
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
jtibshirani marked this conversation as resolved.
Show resolved Hide resolved
Map<String, IndexFieldCapabilities> curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get());
if (curr != null) {
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true);
}
}
indexResponses.putIfAbsent(resp.getIndexName(), resp);
};
final FailureCollector indexFailures = new FailureCollector();
// One for each cluster including the local cluster
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
Expand All @@ -125,7 +137,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
nowInMillis,
concreteIndices,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse),
handleIndexResponse,
indexFailures::collect,
countDown
);
Expand All @@ -141,7 +153,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
handleIndexResponse.accept(
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
);
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
Expand Down Expand Up @@ -347,12 +361,13 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
final Map<String, List<ShardId>> groupedShardIds = request.shardIds()
.stream()
.collect(Collectors.groupingBy(ShardId::getIndexName));
final FieldCapabilitiesFetcher fetcher = new FieldCapabilitiesFetcher(indicesService);
for (List<ShardId> shardIds : groupedShardIds.values()) {
final Map<ShardId, Exception> failures = new HashMap<>();
final Set<ShardId> unmatched = new HashSet<>();
for (ShardId shardId : shardIds) {
try {
final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(
final FieldCapabilitiesIndexResponse response = fetcher.fetch(
shardId,
request.fields(),
request.filters(),
Expand Down
Loading