Skip to content

Commit

Permalink
Group field caps response by index mapping hash
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Feb 10, 2022
1 parent 0ddfad4 commit edef31b
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -47,7 +48,8 @@ FieldCapabilitiesIndexResponse fetch(
String[] fieldPatterns,
QueryBuilder indexFilter,
long nowInMillis,
Map<String, Object> runtimeFields
Map<String, Object> runtimeFields,
Map<String, Map<String, IndexFieldCapabilities>> mappingHashToFieldCaps
) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
Expand All @@ -63,7 +65,16 @@ FieldCapabilitiesIndexResponse fetch(
);

if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false);
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), null, Collections.emptyMap(), false);
}

final MappingMetadata mapping = indexService.getMetadata().mapping();
final String indexMappingHash = mapping != null ? mapping.getSha256() : null;
if (indexMappingHash != null) {
final Map<String, IndexFieldCapabilities> existing = mappingHashToFieldCaps.get(indexMappingHash);
if (existing != null) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, existing, true);
}
}

Set<String> fieldNames = new HashSet<>();
Expand Down Expand Up @@ -125,7 +136,7 @@ FieldCapabilitiesIndexResponse fetch(
}
}
}
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), responseMap, true);
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, responseMap, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,113 @@
package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class FieldCapabilitiesIndexResponse implements Writeable {
private static final Version MAPPING_HASH_VERSION = Version.V_8_2_0;

public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
@Nullable
private final String indexMappingHash;
private final Map<String, IndexFieldCapabilities> responseMap;
private final boolean canMatch;
private final transient Version originVersion;

FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
FieldCapabilitiesIndexResponse(
String indexName,
@Nullable String indexMappingHash,
Map<String, IndexFieldCapabilities> responseMap,
boolean canMatch
) {
this.indexName = indexName;
this.indexMappingHash = indexMappingHash;
this.responseMap = responseMap;
this.canMatch = canMatch;
this.originVersion = Version.CURRENT;
}

FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.canMatch = in.readBoolean();
this.originVersion = in.getVersion();
if (in.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
this.indexMappingHash = in.readOptionalString();
} else {
this.indexMappingHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(MAPPING_HASH_VERSION)) {
out.writeOptionalString(indexMappingHash);
}
}

private record GroupByMappingHash(List<String> indices, String indexMappingHash, Map<String, IndexFieldCapabilities> responseMap)
implements
Writeable {
GroupByMappingHash(StreamInput in) throws IOException {
this(in.readStringList(), in.readString(), in.readMap(StreamInput::readString, IndexFieldCapabilities::new));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(indices);
out.writeString(indexMappingHash);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
}

List<FieldCapabilitiesIndexResponse> getResponses() {
return indices.stream().map(index -> new FieldCapabilitiesIndexResponse(index, indexMappingHash, responseMap, true)).toList();
}
}

static List<FieldCapabilitiesIndexResponse> readList(StreamInput input) throws IOException {
if (input.getVersion().before(MAPPING_HASH_VERSION)) {
return input.readList(FieldCapabilitiesIndexResponse::new);
}
final List<FieldCapabilitiesIndexResponse> ungroupedList = input.readList(FieldCapabilitiesIndexResponse::new);
final List<GroupByMappingHash> groups = input.readList(GroupByMappingHash::new);
return Stream.concat(ungroupedList.stream(), groups.stream().flatMap(g -> g.getResponses().stream())).toList();
}

static void writeList(StreamOutput output, List<FieldCapabilitiesIndexResponse> responses) throws IOException {
if (output.getVersion().before(MAPPING_HASH_VERSION)) {
output.writeCollection(responses);
return;
}
final Predicate<FieldCapabilitiesIndexResponse> canGroup = r -> r.canMatch && r.indexMappingHash != null;
final List<FieldCapabilitiesIndexResponse> ungroupedResponses = responses.stream().filter(r -> canGroup.test(r) == false).toList();
final List<GroupByMappingHash> groupedResponses = responses.stream()
.filter(canGroup)
.collect(Collectors.groupingBy(r -> r.indexMappingHash))
.values()
.stream()
.map(rs -> {
final String indexMappingHash = rs.get(0).indexMappingHash;
final Map<String, IndexFieldCapabilities> responseMap = rs.get(0).responseMap;
final List<String> indices = rs.stream().map(r -> r.indexName).toList();
return new GroupByMappingHash(indices, indexMappingHash, responseMap);
})
.toList();
output.writeList(ungroupedResponses);
output.writeList(groupedResponses);
}

/**
Expand All @@ -46,6 +125,14 @@ public String getIndexName() {
return indexName;
}

/**
* Returns the index mapping hash associated with this index if exists
*/
@Nullable
public String getIndexMappingHash() {
return indexMappingHash;
}

public boolean canMatch() {
return canMatch;
}
Expand All @@ -69,23 +156,19 @@ Version getOriginVersion() {
return originVersion;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeBoolean(canMatch);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
return canMatch == that.canMatch
&& Objects.equals(indexName, that.indexName)
&& Objects.equals(indexMappingHash, that.indexMappingHash)
&& Objects.equals(responseMap, that.responseMap);
}

@Override
public int hashCode() {
return Objects.hash(indexName, responseMap, canMatch);
return Objects.hash(indexName, indexMappingHash, responseMap, canMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable

FieldCapabilitiesNodeResponse(StreamInput in) throws IOException {
super(in);
this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readMap(ShardId::new, StreamInput::readException);
this.unmatchedShardIds = in.readSet(ShardId::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
out.writeCollection(unmatchedShardIds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField);
indexResponses = in.readList(FieldCapabilitiesIndexResponse::new);
this.indexResponses = FieldCapabilitiesIndexResponse.readList(in);
this.failures = in.readList(FieldCapabilitiesFailure::new);
}

Expand Down Expand Up @@ -141,7 +141,7 @@ private static Map<String, FieldCapabilities> readField(StreamInput in) throws I
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField);
out.writeList(indexResponses);
FieldCapabilitiesIndexResponse.writeList(out, indexResponses);
out.writeList(failures);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -113,6 +114,16 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
checkIndexBlocks(clusterState, concreteIndices);

final Map<String, FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedMap(new HashMap<>());
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>());
final Consumer<FieldCapabilitiesIndexResponse> handleIndexResponse = resp -> {
if (resp.canMatch() && resp.getIndexMappingHash() != null) {
Map<String, IndexFieldCapabilities> curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get());
if (curr != null) {
resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true);
}
}
indexResponses.putIfAbsent(resp.getIndexName(), resp);
};
final FailureCollector indexFailures = new FailureCollector();
// One for each cluster including the local cluster
final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size());
Expand All @@ -126,7 +137,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
nowInMillis,
concreteIndices,
threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION),
indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse),
handleIndexResponse,
indexFailures::collect,
countDown
);
Expand All @@ -142,7 +153,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
handleIndexResponse.accept(
new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch())
);
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
Expand Down Expand Up @@ -347,6 +360,7 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
final Map<String, List<ShardId>> groupedShardIds = request.shardIds()
.stream()
.collect(Collectors.groupingBy(ShardId::getIndexName));
final Map<String, Map<String, IndexFieldCapabilities>> indexMappingHashToResponses = new HashMap<>();
for (List<ShardId> shardIds : groupedShardIds.values()) {
final Map<ShardId, Exception> failures = new HashMap<>();
final Set<ShardId> unmatched = new HashSet<>();
Expand All @@ -357,12 +371,16 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
request.fields(),
request.indexFilter(),
request.nowInMillis(),
request.runtimeFields()
request.runtimeFields(),
indexMappingHashToResponses
);
if (response.canMatch()) {
unmatched.clear();
failures.clear();
allResponses.add(response);
if (response.getIndexMappingHash() != null) {
indexMappingHashToResponses.putIfAbsent(response.getIndexMappingHash(), response.get());
}
break;
} else {
unmatched.add(shardId);
Expand Down
Loading

0 comments on commit edef31b

Please sign in to comment.