From edef31b36e0f876afc3261f701172ac5f025ddc5 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 9 Feb 2022 16:39:09 -0500 Subject: [PATCH] Group field caps response by index mapping hash --- .../fieldcaps/FieldCapabilitiesFetcher.java | 17 ++- .../FieldCapabilitiesIndexResponse.java | 109 +++++++++++++++--- .../FieldCapabilitiesNodeResponse.java | 4 +- .../fieldcaps/FieldCapabilitiesResponse.java | 4 +- .../TransportFieldCapabilitiesAction.java | 24 +++- .../FieldCapabilitiesNodeResponseTests.java | 90 +++++++++++++++ .../FieldCapabilitiesResponseTests.java | 2 +- 7 files changed, 226 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java index 7eac2f54c1137..d34971244535d 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.fieldcaps; +import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MappedFieldType; @@ -47,7 +48,8 @@ FieldCapabilitiesIndexResponse fetch( String[] fieldPatterns, QueryBuilder indexFilter, long nowInMillis, - Map runtimeFields + Map runtimeFields, + Map> mappingHashToFieldCaps ) throws IOException { final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); @@ -63,7 +65,16 @@ FieldCapabilitiesIndexResponse fetch( ); if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) { - return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false); + return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), null, Collections.emptyMap(), false); + } + + final MappingMetadata mapping = indexService.getMetadata().mapping(); + final String indexMappingHash = mapping != null ? mapping.getSha256() : null; + if (indexMappingHash != null) { + final Map existing = mappingHashToFieldCaps.get(indexMappingHash); + if (existing != null) { + return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, existing, true); + } } Set fieldNames = new HashSet<>(); @@ -125,7 +136,7 @@ FieldCapabilitiesIndexResponse fetch( } } } - return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), responseMap, true); + return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), indexMappingHash, responseMap, true); } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java index 397018208202b..3d03cfc92e1e2 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java @@ -9,34 +9,113 @@ package org.elasticsearch.action.fieldcaps; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +final class FieldCapabilitiesIndexResponse implements Writeable { + private static final Version MAPPING_HASH_VERSION = Version.V_8_2_0; -public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable { private final String indexName; + @Nullable + private final String indexMappingHash; private final Map responseMap; private final boolean canMatch; private final transient Version originVersion; - FieldCapabilitiesIndexResponse(String indexName, Map responseMap, boolean canMatch) { + FieldCapabilitiesIndexResponse( + String indexName, + @Nullable String indexMappingHash, + Map responseMap, + boolean canMatch + ) { this.indexName = indexName; + this.indexMappingHash = indexMappingHash; this.responseMap = responseMap; this.canMatch = canMatch; this.originVersion = Version.CURRENT; } FieldCapabilitiesIndexResponse(StreamInput in) throws IOException { - super(in); this.indexName = in.readString(); this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new); this.canMatch = in.readBoolean(); this.originVersion = in.getVersion(); + if (in.getVersion().onOrAfter(MAPPING_HASH_VERSION)) { + this.indexMappingHash = in.readOptionalString(); + } else { + this.indexMappingHash = null; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(indexName); + out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); + out.writeBoolean(canMatch); + if (out.getVersion().onOrAfter(MAPPING_HASH_VERSION)) { + out.writeOptionalString(indexMappingHash); + } + } + + private record GroupByMappingHash(List indices, String indexMappingHash, Map responseMap) + implements + Writeable { + GroupByMappingHash(StreamInput in) throws IOException { + this(in.readStringList(), in.readString(), in.readMap(StreamInput::readString, IndexFieldCapabilities::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringCollection(indices); + out.writeString(indexMappingHash); + out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); + } + + List getResponses() { + return indices.stream().map(index -> new FieldCapabilitiesIndexResponse(index, indexMappingHash, responseMap, true)).toList(); + } + } + + static List readList(StreamInput input) throws IOException { + if (input.getVersion().before(MAPPING_HASH_VERSION)) { + return input.readList(FieldCapabilitiesIndexResponse::new); + } + final List ungroupedList = input.readList(FieldCapabilitiesIndexResponse::new); + final List groups = input.readList(GroupByMappingHash::new); + return Stream.concat(ungroupedList.stream(), groups.stream().flatMap(g -> g.getResponses().stream())).toList(); + } + + static void writeList(StreamOutput output, List responses) throws IOException { + if (output.getVersion().before(MAPPING_HASH_VERSION)) { + output.writeCollection(responses); + return; + } + final Predicate canGroup = r -> r.canMatch && r.indexMappingHash != null; + final List ungroupedResponses = responses.stream().filter(r -> canGroup.test(r) == false).toList(); + final List groupedResponses = responses.stream() + .filter(canGroup) + .collect(Collectors.groupingBy(r -> r.indexMappingHash)) + .values() + .stream() + .map(rs -> { + final String indexMappingHash = rs.get(0).indexMappingHash; + final Map responseMap = rs.get(0).responseMap; + final List indices = rs.stream().map(r -> r.indexName).toList(); + return new GroupByMappingHash(indices, indexMappingHash, responseMap); + }) + .toList(); + output.writeList(ungroupedResponses); + output.writeList(groupedResponses); } /** @@ -46,6 +125,14 @@ public String getIndexName() { return indexName; } + /** + * Returns the index mapping hash associated with this index if exists + */ + @Nullable + public String getIndexMappingHash() { + return indexMappingHash; + } + public boolean canMatch() { return canMatch; } @@ -69,23 +156,19 @@ Version getOriginVersion() { return originVersion; } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(indexName); - out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); - out.writeBoolean(canMatch); - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o; - return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap); + return canMatch == that.canMatch + && Objects.equals(indexName, that.indexName) + && Objects.equals(indexMappingHash, that.indexMappingHash) + && Objects.equals(responseMap, that.responseMap); } @Override public int hashCode() { - return Objects.hash(indexName, responseMap, canMatch); + return Objects.hash(indexName, indexMappingHash, responseMap, canMatch); } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java index 6d103fbe863cc..91f079cadbd99 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java @@ -37,14 +37,14 @@ class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable FieldCapabilitiesNodeResponse(StreamInput in) throws IOException { super(in); - this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new); + this.indexResponses = FieldCapabilitiesIndexResponse.readList(in); this.failures = in.readMap(ShardId::new, StreamInput::readException); this.unmatchedShardIds = in.readSet(ShardId::new); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeList(indexResponses); + FieldCapabilitiesIndexResponse.writeList(out, indexResponses); out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException); out.writeCollection(unmatchedShardIds); } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java index 7e14fb667c96e..dba604db2faf2 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponse.java @@ -75,7 +75,7 @@ public FieldCapabilitiesResponse(StreamInput in) throws IOException { super(in); indices = in.readStringArray(); this.responseMap = in.readMap(StreamInput::readString, FieldCapabilitiesResponse::readField); - indexResponses = in.readList(FieldCapabilitiesIndexResponse::new); + this.indexResponses = FieldCapabilitiesIndexResponse.readList(in); this.failures = in.readList(FieldCapabilitiesFailure::new); } @@ -141,7 +141,7 @@ private static Map readField(StreamInput in) throws I public void writeTo(StreamOutput out) throws IOException { out.writeStringArray(indices); out.writeMap(responseMap, StreamOutput::writeString, FieldCapabilitiesResponse::writeField); - out.writeList(indexResponses); + FieldCapabilitiesIndexResponse.writeList(out, indexResponses); out.writeList(failures); } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 16413339d719d..ec553bb90bec2 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -113,6 +114,16 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti checkIndexBlocks(clusterState, concreteIndices); final Map indexResponses = Collections.synchronizedMap(new HashMap<>()); + final Map> indexMappingHashToResponses = Collections.synchronizedMap(new HashMap<>()); + final Consumer handleIndexResponse = resp -> { + if (resp.canMatch() && resp.getIndexMappingHash() != null) { + Map curr = indexMappingHashToResponses.putIfAbsent(resp.getIndexMappingHash(), resp.get()); + if (curr != null) { + resp = new FieldCapabilitiesIndexResponse(resp.getIndexName(), resp.getIndexMappingHash(), curr, true); + } + } + indexResponses.putIfAbsent(resp.getIndexName(), resp); + }; final FailureCollector indexFailures = new FailureCollector(); // One for each cluster including the local cluster final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size()); @@ -126,7 +137,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti nowInMillis, concreteIndices, threadPool.executor(ThreadPool.Names.SEARCH_COORDINATION), - indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse), + handleIndexResponse, indexFailures::collect, countDown ); @@ -142,7 +153,9 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); - indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch())); + handleIndexResponse.accept( + new FieldCapabilitiesIndexResponse(indexName, resp.getIndexMappingHash(), resp.get(), resp.canMatch()) + ); } for (FieldCapabilitiesFailure failure : response.getFailures()) { Exception ex = failure.getException(); @@ -347,6 +360,7 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann final Map> groupedShardIds = request.shardIds() .stream() .collect(Collectors.groupingBy(ShardId::getIndexName)); + final Map> indexMappingHashToResponses = new HashMap<>(); for (List shardIds : groupedShardIds.values()) { final Map failures = new HashMap<>(); final Set unmatched = new HashSet<>(); @@ -357,12 +371,16 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann request.fields(), request.indexFilter(), request.nowInMillis(), - request.runtimeFields() + request.runtimeFields(), + indexMappingHashToResponses ); if (response.canMatch()) { unmatched.clear(); failures.clear(); allResponses.add(response); + if (response.getIndexMappingHash() != null) { + indexMappingHashToResponses.putIfAbsent(response.getIndexMappingHash(), response.get()); + } break; } else { unmatched.add(shardId); diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java index 6a132a6f663d1..5e7409f54bf70 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java @@ -8,15 +8,28 @@ package org.elasticsearch.action.fieldcaps; +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; public class FieldCapabilitiesNodeResponseTests extends AbstractWireSerializingTestCase { @@ -57,4 +70,81 @@ protected FieldCapabilitiesNodeResponse mutateInstance(FieldCapabilitiesNodeResp } return new FieldCapabilitiesNodeResponse(newResponses, Collections.emptyMap(), response.getUnmatchedShardIds()); } + + public void testShareResponsesUsingMappingHash() throws Exception { + final Supplier> randomFieldCaps = () -> { + final Map fieldCaps = new HashMap<>(); + final List fields = randomList(1, 5, () -> randomAlphaOfLength(5)); + for (String field : fields) { + final IndexFieldCapabilities fieldCap = new IndexFieldCapabilities( + field, + randomAlphaOfLengthBetween(5, 20), + randomBoolean(), + randomBoolean(), + randomBoolean(), + false, + null, + Map.of() + ); + fieldCaps.put(field, fieldCap); + } + return fieldCaps; + }; + final List inList = new ArrayList<>(); + int numGroups = randomIntBetween(0, 20); + Map> mappingHashToIndices = new HashMap<>(); + for (int i = 0; i < numGroups; i++) { + String groupName = "group_" + i; + String hashing = UUIDs.randomBase64UUID(); + List indices = IntStream.range(0, randomIntBetween(1, 5)).mapToObj(n -> groupName + "_" + n).toList(); + mappingHashToIndices.put(hashing, indices); + Map fieldCaps = randomFieldCaps.get(); + for (String index : indices) { + inList.add(new FieldCapabilitiesIndexResponse(index, hashing, fieldCaps, true)); + } + } + int numUngroups = randomIntBetween(0, 5); + for (int i = 0; i < numUngroups; i++) { + String index = "ungrouped_" + i; + final String hashing; + final boolean canMatch; + Map fieldCaps = Map.of(); + if (randomBoolean()) { + canMatch = false; + hashing = UUIDs.randomBase64UUID(); + } else { + canMatch = randomBoolean(); + hashing = null; + if (canMatch) { + fieldCaps = randomFieldCaps.get(); + } + } + inList.add(new FieldCapabilitiesIndexResponse(index, hashing, fieldCaps, canMatch)); + } + Randomness.shuffle(inList); + final List serializedList; + try (BytesStreamOutput output = new BytesStreamOutput()) { + FieldCapabilitiesIndexResponse.writeList(output, inList); + try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) { + serializedList = FieldCapabilitiesIndexResponse.readList(in); + } + } + assertThat( + serializedList.stream().sorted(Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName)).toList(), + equalTo(inList.stream().sorted(Comparator.comparing(FieldCapabilitiesIndexResponse::getIndexName)).toList()) + ); + Map> groupedResponses = serializedList.stream() + .filter(r -> r.canMatch() && r.getIndexMappingHash() != null) + .collect(Collectors.groupingBy(FieldCapabilitiesIndexResponse::getIndexMappingHash)); + assertThat(groupedResponses.keySet(), equalTo(mappingHashToIndices.keySet())); + for (Map.Entry> e : groupedResponses.entrySet()) { + List indices = mappingHashToIndices.get(e.getKey()); + List rs = e.getValue(); + assertThat(rs.stream().map(FieldCapabilitiesIndexResponse::getIndexName).sorted().toList(), equalTo(indices)); + for (FieldCapabilitiesIndexResponse r : rs) { + assertTrue(r.canMatch()); + assertSame(r.get(), rs.get(0).get()); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java index fbea856caface..1f43f88c192ae 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java @@ -60,7 +60,7 @@ public static FieldCapabilitiesIndexResponse randomIndexResponse(String index, b for (String field : fields) { responses.put(field, randomFieldCaps(field)); } - return new FieldCapabilitiesIndexResponse(index, responses, canMatch); + return new FieldCapabilitiesIndexResponse(index, null, responses, canMatch); } public static IndexFieldCapabilities randomFieldCaps(String fieldName) {