From 97522d9d92e108031652e5375e464c47ba738eec Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 30 Aug 2021 15:20:37 -0700 Subject: [PATCH 1/8] Pull out shard operation into its own class --- .../fieldcaps/FieldCapabilitiesFetcher.java | 117 ++++++++++++++++++ ...TransportFieldCapabilitiesIndexAction.java | 96 +------------- 2 files changed, 120 insertions(+), 93 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java new file mode 100644 index 0000000000000..6d6b542d1c957 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.ObjectMapper; +import org.elasticsearch.index.mapper.RuntimeField; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** + * Loads the mappings for an index and computes all {@link IndexFieldCapabilities}. This + * helper class performs the core shard operation for the field capabilities action. + */ +class FieldCapabilitiesFetcher { + private final IndicesService indicesService; + + FieldCapabilitiesFetcher(IndicesService indicesService) { + this.indicesService = indicesService; + } + + public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest request) throws IOException { + final ShardId shardId = request.shardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(request.shardId().getId()); + try (Engine.Searcher searcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { + + final SearchExecutionContext searchExecutionContext = indexService.newSearchExecutionContext(shardId.id(), 0, + searcher, request::nowInMillis, null, request.runtimeFields()); + + if (canMatchShard(request, searchExecutionContext) == false) { + return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false); + } + + Set fieldNames = new HashSet<>(); + for (String pattern : request.fields()) { + fieldNames.addAll(searchExecutionContext.getMatchingFieldNames(pattern)); + } + + Predicate fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName()); + Map responseMap = new HashMap<>(); + for (String field : fieldNames) { + MappedFieldType ft = searchExecutionContext.getFieldType(field); + boolean isMetadataField = searchExecutionContext.isMetadataField(field); + if (isMetadataField || fieldPredicate.test(ft.name())) { + IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(field, + ft.familyTypeName(), isMetadataField, ft.isSearchable(), ft.isAggregatable(), ft.meta()); + responseMap.put(field, fieldCap); + } else { + continue; + } + + // Check the ancestor of the field to find nested and object fields. + // Runtime fields are excluded since they can override any path. + //TODO find a way to do this that does not require an instanceof check + if (ft instanceof RuntimeField == false) { + int dotIndex = ft.name().lastIndexOf('.'); + while (dotIndex > -1) { + String parentField = ft.name().substring(0, dotIndex); + if (responseMap.containsKey(parentField)) { + // we added this path on another field already + break; + } + // checks if the parent field contains sub-fields + if (searchExecutionContext.getFieldType(parentField) == null) { + // no field type, it must be an object field + ObjectMapper mapper = searchExecutionContext.getObjectMapper(parentField); + // Composite runtime fields do not have a mapped type for the root - check for null + if (mapper != null) { + String type = mapper.isNested() ? "nested" : "object"; + IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(parentField, type, + false, false, false, Collections.emptyMap()); + responseMap.put(parentField, fieldCap); + } + } + dotIndex = parentField.lastIndexOf('.'); + } + } + } + return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true); + } + } + + private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecutionContext searchExecutionContext) throws IOException { + if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) { + return true; + } + assert req.nowInMillis() != 0L; + ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), null, req.nowInMillis(), AliasFilter.EMPTY); + searchRequest.source(new SearchSourceBuilder().query(req.indexFilter())); + return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext); + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index ae64895db41f7..71224323e8fbc 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -27,20 +27,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.ObjectMapper; -import org.elasticsearch.index.mapper.RuntimeField; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.SearchExecutionContext; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.search.SearchService; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -51,11 +38,6 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; @@ -69,7 +51,7 @@ public class TransportFieldCapabilitiesIndexAction private final ClusterService clusterService; private final TransportService transportService; - private final IndicesService indicesService; + private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher; @Inject public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, @@ -79,7 +61,7 @@ public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, super(ACTION_NAME, transportService, actionFilters, FieldCapabilitiesIndexRequest::new); this.clusterService = clusterService; this.transportService = transportService; - this.indicesService = indicesService; + this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService); transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME, FieldCapabilitiesIndexRequest::new, new ShardTransportHandler()); } @@ -89,78 +71,6 @@ protected void doExecute(Task task, FieldCapabilitiesIndexRequest request, Actio new AsyncShardsAction(transportService, clusterService, request, listener).start(); } - private FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request) throws IOException { - final ShardId shardId = request.shardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(request.shardId().getId()); - try (Engine.Searcher searcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { - - final SearchExecutionContext searchExecutionContext = indexService.newSearchExecutionContext(shardId.id(), 0, - searcher, request::nowInMillis, null, request.runtimeFields()); - - if (canMatchShard(request, searchExecutionContext) == false) { - return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false); - } - - Set fieldNames = new HashSet<>(); - for (String pattern : request.fields()) { - fieldNames.addAll(searchExecutionContext.getMatchingFieldNames(pattern)); - } - - Predicate fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName()); - Map responseMap = new HashMap<>(); - for (String field : fieldNames) { - MappedFieldType ft = searchExecutionContext.getFieldType(field); - boolean isMetadataField = searchExecutionContext.isMetadataField(field); - if (isMetadataField || fieldPredicate.test(ft.name())) { - IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(field, - ft.familyTypeName(), isMetadataField, ft.isSearchable(), ft.isAggregatable(), ft.meta()); - responseMap.put(field, fieldCap); - } else { - continue; - } - - // Check the ancestor of the field to find nested and object fields. - // Runtime fields are excluded since they can override any path. - //TODO find a way to do this that does not require an instanceof check - if (ft instanceof RuntimeField == false) { - int dotIndex = ft.name().lastIndexOf('.'); - while (dotIndex > -1) { - String parentField = ft.name().substring(0, dotIndex); - if (responseMap.containsKey(parentField)) { - // we added this path on another field already - break; - } - // checks if the parent field contains sub-fields - if (searchExecutionContext.getFieldType(parentField) == null) { - // no field type, it must be an object field - ObjectMapper mapper = searchExecutionContext.getObjectMapper(parentField); - // Composite runtime fields do not have a mapped type for the root - check for null - if (mapper != null) { - String type = mapper.isNested() ? "nested" : "object"; - IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(parentField, type, - false, false, false, Collections.emptyMap()); - responseMap.put(parentField, fieldCap); - } - } - dotIndex = parentField.lastIndexOf('.'); - } - } - } - return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true); - } - } - - private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecutionContext searchExecutionContext) throws IOException { - if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) { - return true; - } - assert req.nowInMillis() != 0L; - ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), null, req.nowInMillis(), AliasFilter.EMPTY); - searchRequest.source(new SearchSourceBuilder().query(req.indexFilter())); - return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext); - } - /** * An action that executes on each shard sequentially until it finds one that can match the provided * {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used @@ -290,7 +200,7 @@ public void messageReceived(final FieldCapabilitiesIndexRequest request, ActionListener listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request); final FieldCapabilitiesIndexResponse resp; try { - resp = shardOperation(request); + resp = fieldCapabilitiesFetcher.fetch(request); } catch (Exception exc) { listener.onFailure(exc); return; From 999b7a57d0dee40d9acd1c4f91b4b3ad612c3a4e Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 30 Aug 2021 18:40:47 -0700 Subject: [PATCH 2/8] Add request + response classes, plus tests --- .../elasticsearch/action/OriginalIndices.java | 16 +++ .../FieldCapabilitiesNodeRequest.java | 126 ++++++++++++++++++ .../FieldCapabilitiesNodeResponse.java | 72 ++++++++++ .../FieldCapabilitiesNodeRequestTests.java | 113 ++++++++++++++++ .../FieldCapabilitiesNodeResponseTests.java | 55 ++++++++ .../FieldCapabilitiesResponseTests.java | 2 +- 6 files changed, 383 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java create mode 100644 server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/OriginalIndices.java b/server/src/main/java/org/elasticsearch/action/OriginalIndices.java index 7b2f03287bb7a..82627211fd7b4 100644 --- a/server/src/main/java/org/elasticsearch/action/OriginalIndices.java +++ b/server/src/main/java/org/elasticsearch/action/OriginalIndices.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Used to keep track of original indices within internal (e.g. shard level) requests @@ -67,4 +68,19 @@ public String toString() { ", indicesOptions=" + indicesOptions + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OriginalIndices that = (OriginalIndices) o; + return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions); + } + + @Override + public int hashCode() { + int result = Objects.hash(indicesOptions); + result = 31 * result + Arrays.hashCode(indices); + return result; + } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java new file mode 100644 index 0000000000000..9803d20432cbf --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +class FieldCapabilitiesNodeRequest extends ActionRequest implements IndicesRequest { + + private final ShardId[] shardIds; + private final String[] fields; + private final OriginalIndices originalIndices; + private final QueryBuilder indexFilter; + private final long nowInMillis; + private final Map runtimeFields; + + FieldCapabilitiesNodeRequest(StreamInput in) throws IOException { + super(in); + shardIds = in.readArray(ShardId::new, ShardId[]::new); + fields = in.readStringArray(); + originalIndices = OriginalIndices.readOriginalIndices(in); + indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); + nowInMillis = in.readLong(); + runtimeFields = in.readMap(); + } + + FieldCapabilitiesNodeRequest(ShardId[] shardIds, + String[] fields, + OriginalIndices originalIndices, + QueryBuilder indexFilter, + long nowInMillis, + Map runtimeFields) { + this.fields = fields; + this.shardIds = shardIds; + this.originalIndices = originalIndices; + this.indexFilter = indexFilter; + this.nowInMillis = nowInMillis; + this.runtimeFields = runtimeFields; + } + + public String[] fields() { + return fields; + } + + public OriginalIndices originalIndices() { + return originalIndices; + } + + @Override + public String[] indices() { + return originalIndices.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + return originalIndices.indicesOptions(); + } + + public QueryBuilder indexFilter() { + return indexFilter; + } + + public Map runtimeFields() { + return runtimeFields; + } + + public ShardId[] shardIds() { + return shardIds; + } + + public long nowInMillis() { + return nowInMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeArray(shardIds); + out.writeStringArray(fields); + OriginalIndices.writeOriginalIndices(originalIndices, out); + out.writeOptionalNamedWriteable(indexFilter); + out.writeLong(nowInMillis); + out.writeMap(runtimeFields); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldCapabilitiesNodeRequest that = (FieldCapabilitiesNodeRequest) o; + return nowInMillis == that.nowInMillis && Arrays.equals(shardIds, that.shardIds) + && Arrays.equals(fields, that.fields) && Objects.equals(originalIndices, that.originalIndices) + && Objects.equals(indexFilter, that.indexFilter) && Objects.equals(runtimeFields, that.runtimeFields); + } + + @Override + public int hashCode() { + int result = Objects.hash(originalIndices, indexFilter, nowInMillis, runtimeFields); + result = 31 * result + Arrays.hashCode(shardIds); + result = 31 * result + Arrays.hashCode(fields); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java new file mode 100644 index 0000000000000..497096b9732f8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java @@ -0,0 +1,72 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable { + private final String[] indices; + private final List failures; + private final List indexResponses; + + FieldCapabilitiesNodeResponse(String[] indices, + List indexResponses, + List failures) { + this.indexResponses = Objects.requireNonNull(indexResponses); + this.indices = indices; + this.failures = failures; + } + + FieldCapabilitiesNodeResponse(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indexResponses = in.readList(FieldCapabilitiesIndexResponse::new); + this.failures = in.readList(FieldCapabilitiesFailure::new); + } + + public List getFailures() { + return failures; + } + + public List getIndexResponses() { + return indexResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArray(indices); + out.writeList(indexResponses); + out.writeList(failures); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldCapabilitiesNodeResponse that = (FieldCapabilitiesNodeResponse) o; + return Arrays.equals(indices, that.indices) && + Objects.equals(indexResponses, that.indexResponses) && + Objects.equals(failures, that.failures); + } + + @Override + public int hashCode() { + int result = Objects.hash( indexResponses, failures); + result = 31 * result + Arrays.hashCode(indices); + return result; + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java new file mode 100644 index 0000000000000..aef61b7f51393 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +public class FieldCapabilitiesNodeRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected FieldCapabilitiesNodeRequest createTestInstance() { + ShardId[] randomShards = randomShardIds(randomIntBetween(1, 5)); + String[] randomFields = randomFields(randomIntBetween(1, 20)); + OriginalIndices originalIndices = randomOriginalIndices(randomIntBetween(0, 20)); + + QueryBuilder indexFilter = randomBoolean() ? QueryBuilders.termQuery("field", randomAlphaOfLength(5)) : null; + long nowInMillis = randomLong(); + + Map runtimeFields = randomBoolean() + ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) + : null; + + return new FieldCapabilitiesNodeRequest(randomShards, randomFields, originalIndices, indexFilter, nowInMillis, runtimeFields); + } + + private ShardId[] randomShardIds(int numShards) { + ShardId[] randomShards = new ShardId[numShards]; + for (int i = 0; i < numShards; i++) { + randomShards[i] = new ShardId("index", randomAlphaOfLength(10), i); + } + return randomShards; + } + + private String[] randomFields(int numFields) { + String[] randomFields = new String[numFields]; + for (int i = 0; i < numFields; i++) { + randomFields[i] = randomAlphaOfLengthBetween(5, 10); + } + return randomFields; + } + + private OriginalIndices randomOriginalIndices(int numIndices) { + String[] randomIndices = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + randomIndices[i] = randomAlphaOfLengthBetween(5, 10); + } + IndicesOptions indicesOptions = randomBoolean() ? IndicesOptions.strictExpand() : IndicesOptions.lenientExpandOpen(); + return new OriginalIndices(randomIndices, indicesOptions); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + return new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + @Override + protected Writeable.Reader instanceReader() { + return FieldCapabilitiesNodeRequest::new; + } + + @Override + protected FieldCapabilitiesNodeRequest mutateInstance(FieldCapabilitiesNodeRequest instance) throws IOException { + switch (random().nextInt(5)) { + case 0: + ShardId[] shardIds = randomShardIds(instance.shardIds().length + 1); + return new FieldCapabilitiesNodeRequest(shardIds, instance.fields(), instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 1: + String[] fields = randomFields(instance.fields().length + 2); + return new FieldCapabilitiesNodeRequest(instance.shardIds(), fields, instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 2: + OriginalIndices originalIndices = randomOriginalIndices(instance.indices().length + 1); + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), originalIndices, + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 3: + QueryBuilder indexFilter = instance.indexFilter() == null ? QueryBuilders.matchAllQuery() : null; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + indexFilter, instance.nowInMillis(), instance.runtimeFields()); + case 4: + long nowInMillis = instance.nowInMillis() + 100; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + instance.indexFilter(), nowInMillis, instance.runtimeFields()); + case 5: + Map runtimeFields = instance.runtimeFields() == null + ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) + : null; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), runtimeFields); + default: + throw new IllegalStateException("The test should only allow 5 parameters mutated"); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java new file mode 100644 index 0000000000000..5d4a3f51ff2e0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class FieldCapabilitiesNodeResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected FieldCapabilitiesNodeResponse createTestInstance() { + List responses = new ArrayList<>(); + int numResponse = randomIntBetween(0, 10); + for (int i = 0; i < numResponse; i++) { + responses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse()); + } + String[] randomIndices = generateRandomStringArray(5, 10, false, false); + return new FieldCapabilitiesNodeResponse(randomIndices, responses, Collections.emptyList()); + } + + @Override + protected Writeable.Reader instanceReader() { + return FieldCapabilitiesNodeResponse::new; + } + + @Override + protected FieldCapabilitiesNodeResponse mutateInstance(FieldCapabilitiesNodeResponse response) { + List newResponses = new ArrayList<>(response.getIndexResponses()); + int mutation = response.getIndexResponses().isEmpty() ? 0 : randomIntBetween(0, 2); + switch (mutation) { + case 0: + newResponses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse()); + break; + case 1: + int toRemove = randomInt(newResponses.size() - 1); + newResponses.remove(toRemove); + break; + case 2: + int toReplace = randomInt(newResponses.size() - 1); + newResponses.set(toReplace, FieldCapabilitiesResponseTests.createRandomIndexResponse()); + break; + } + return new FieldCapabilitiesNodeResponse(null, newResponses, Collections.emptyList()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java index 5d05f4c2dc227..12ae6614dfe11 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java @@ -51,7 +51,7 @@ protected Writeable.Reader instanceReader() { return FieldCapabilitiesResponse::new; } - private FieldCapabilitiesIndexResponse createRandomIndexResponse() { + static FieldCapabilitiesIndexResponse createRandomIndexResponse() { Map responses = new HashMap<>(); String[] fields = generateRandomStringArray(5, 10, false, true); From 48cb0afc49c5ac4e2bf47a0880c2a170694ee502 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 30 Aug 2021 18:41:33 -0700 Subject: [PATCH 3/8] Group index requests into node requests --- .../TransportFieldCapabilitiesAction.java | 205 +++++++++++++++--- 1 file changed, 175 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 728d2f39d2e59..317fc084aecde 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -11,23 +11,34 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -41,10 +52,14 @@ import java.util.function.Predicate; public class TransportFieldCapabilitiesAction extends HandledTransportAction { + public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]"; + private final ThreadPool threadPool; private final TransportService transportService; private final ClusterService clusterService; private final IndexNameExpressionResolver indexNameExpressionResolver; + + private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher; private final Predicate metadataFieldPred; @Inject @@ -59,8 +74,13 @@ public TransportFieldCapabilitiesAction(TransportService transportService, this.transportService = transportService; this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; + + this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService); final Set metadataFields = indicesService.getAllMetadataFields(); this.metadataFieldPred = metadataFields::contains; + + transportService.registerRequestHandler(ACTION_NODE_NAME, ThreadPool.Names.SAME, + FieldCapabilitiesNodeRequest::new, new NodeTransportHandler()); } @Override @@ -80,44 +100,92 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); } - checkIndexBlocks(clusterState, concreteIndices); - - final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size(); - if (totalNumRequest == 0) { + if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap())); return; } + checkIndexBlocks(clusterState, concreteIndices); + final List indexResponses = Collections.synchronizedList(new ArrayList<>()); final FailureCollector indexFailures = new FailureCollector(); - final Runnable countDown = createResponseMerger(request, totalNumRequest, indexResponses, indexFailures, listener); + // If all nodes are on version 7.16 or higher, then we group the shard requests and send a single request per node. + // Otherwise, for backwards compatibility we follow the old strategy of sending a separate request per shard. + final Map> shardsByNode; + final CountDown completionCounter; + if (clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_16_0) && request.indexFilter() == null) { + shardsByNode = groupShardsByNode(clusterState, concreteIndices, indexFailures); + completionCounter = new CountDown(shardsByNode.size() + remoteClusterIndices.size()); + } else { + shardsByNode = null; + completionCounter = new CountDown(concreteIndices.length + remoteClusterIndices.size()); + } + + final Runnable countDown = createResponseMerger(request, completionCounter, indexResponses, indexFailures, listener); if (concreteIndices.length > 0) { // fork this action to the management pool as it can fan out to a large number of child requests that get handled on SAME and // thus would all run on the current transport thread and block it for an unacceptable amount of time // (particularly with security enabled) threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.wrap(listener, l -> { - for (String index : concreteIndices) { - new TransportFieldCapabilitiesIndexAction.AsyncShardsAction( - transportService, - clusterService, - prepareLocalIndexRequest(request, index, localIndices, nowInMillis), - new ActionListener() { - @Override - public void onResponse(FieldCapabilitiesIndexResponse result) { - if (result.canMatch()) { - indexResponses.add(result); + if (shardsByNode != null) { + for (Map.Entry> entry : shardsByNode.entrySet()) { + String nodeId = entry.getKey(); + List shardIds = entry.getValue(); + + DiscoveryNode node = clusterState.getNodes().get(nodeId); + assert node != null; + + FieldCapabilitiesNodeRequest nodeRequest = prepareLocalNodeRequest(request, shardIds, localIndices, nowInMillis); + transportService.sendRequest(node, ACTION_NODE_NAME, nodeRequest, new ActionListenerResponseHandler<>( + new ActionListener() { + @Override + public void onResponse(FieldCapabilitiesNodeResponse response) { + for (FieldCapabilitiesIndexResponse indexResponse : response.getIndexResponses()) { + if (indexResponse.canMatch()) { + indexResponses.add(indexResponse); + } + + } + for (FieldCapabilitiesFailure indexFailure : response.getFailures()) { + indexFailures.collect(indexFailure.getException(), indexFailure.getIndices()); + } + countDown.run(); + } + + @Override + public void onFailure(Exception e) { + for (ShardId shardId : shardIds) { + indexFailures.collect(e, shardId.getIndexName()); + } + countDown.run(); + } + }, + FieldCapabilitiesNodeResponse::new)); + } + } else { + for (String index : concreteIndices) { + new TransportFieldCapabilitiesIndexAction.AsyncShardsAction( + transportService, + clusterService, + prepareLocalIndexRequest(request, index, localIndices, nowInMillis), + new ActionListener() { + @Override + public void onResponse(FieldCapabilitiesIndexResponse result) { + if (result.canMatch()) { + indexResponses.add(result); + } + countDown.run(); } - countDown.run(); - } - @Override - public void onFailure(Exception e) { - indexFailures.collect(e, index); - countDown.run(); + @Override + public void onFailure(Exception e) { + indexFailures.collect(e, index); + countDown.run(); + } } - } - ).start(); + ).start(); + } } })); } @@ -160,11 +228,10 @@ private void checkIndexBlocks(ClusterState clusterState, String[] concreteIndice } private Runnable createResponseMerger(FieldCapabilitiesRequest request, - int totalNumRequests, + CountDown completionCounter, List indexResponses, FailureCollector indexFailures, ActionListener listener) { - final CountDown completionCounter = new CountDown(totalNumRequests); return () -> { if (completionCounter.countDown()) { List failures = indexFailures.values(); @@ -193,10 +260,19 @@ private Runnable createResponseMerger(FieldCapabilitiesRequest request, }; } - private static FieldCapabilitiesIndexRequest prepareLocalIndexRequest(FieldCapabilitiesRequest request, - String index, - OriginalIndices originalIndices, - long nowInMillis) { + private static FieldCapabilitiesNodeRequest prepareLocalNodeRequest(FieldCapabilitiesRequest request, + List shardIds, + OriginalIndices originalIndices, + long nowInMillis) { + ShardId[] shardIdArray = shardIds.toArray(new ShardId[0]); + return new FieldCapabilitiesNodeRequest(shardIdArray, request.fields(), originalIndices, + request.indexFilter(), nowInMillis, request.runtimeFields()); + } + + private static FieldCapabilitiesIndexRequest prepareLocalIndexRequest(FieldCapabilitiesRequest request, + String index, + OriginalIndices originalIndices, + long nowInMillis) { return new FieldCapabilitiesIndexRequest(request.fields(), index, originalIndices, request.indexFilter(), nowInMillis, request.runtimeFields()); } @@ -215,6 +291,40 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(FieldCapabilitiesRe return remoteRequest; } + private Map> groupShardsByNode(ClusterState clusterState, + String[] concreteIndices, + FailureCollector indexFailures) { + Map> shardsByNodeId = new HashMap<>(); + for (String indexName : concreteIndices) { + GroupShardsIterator shards = clusterService.operationRouting() + .searchShards(clusterState, new String[]{ indexName }, null, null); + + ShardRouting selectedCopy = null; + for (ShardIterator shardCopies : shards) { + for (ShardRouting copy : shardCopies) { + if (copy.active() && copy.assignedToNode()) { + selectedCopy = copy; + break; + } + } + + if (selectedCopy != null) { + String nodeId = selectedCopy.currentNodeId(); + List shardGroup = shardsByNodeId.computeIfAbsent(nodeId, key -> new ArrayList<>()); + shardGroup.add(selectedCopy.shardId()); + break; + } + } + + if (selectedCopy == null) { + Exception e = new NoShardAvailableActionException(null, + LoggerMessageFormat.format("No shard available for index [{}]", indexName)); + indexFailures.collect(e, indexName); + } + } + return shardsByNodeId; + } + private FieldCapabilitiesResponse merge( List indexResponses, boolean includeUnmapped, @@ -289,6 +399,12 @@ void collect(Exception e, String index) { ); } + void collect(Exception e, String[] indices) { + for (String index : indices) { + collect(e, index); + } + } + void collectRemoteException(Exception ex, String clusterAlias, String[] remoteIndices) { for (String failedIndex : remoteIndices) { collect(ex, RemoteClusterAware.buildRemoteIndexName(clusterAlias, failedIndex)); @@ -299,4 +415,33 @@ int size() { return this.indexFailures.size(); } } + + private class NodeTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(final FieldCapabilitiesNodeRequest request, + final TransportChannel channel, + Task task) throws Exception { + ActionListener listener = new ChannelActionListener<>(channel, ACTION_NODE_NAME, request); + + List indexResponses = new ArrayList<>(); + List indexFailures = new ArrayList<>(); + + for (ShardId shardId : request.shardIds()) { + FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(request.fields(), shardId.getIndexName(), + request.originalIndices(), request.indexFilter(), request.nowInMillis(), request.runtimeFields()); + indexRequest.shardId(shardId); + try { + FieldCapabilitiesIndexResponse indexResponse = fieldCapabilitiesFetcher.fetch(indexRequest); + indexResponses.add(indexResponse); + } catch (Exception e) { + FieldCapabilitiesFailure failure = new FieldCapabilitiesFailure(new String[] {shardId.getIndexName()}, e); + indexFailures.add(failure); + } + } + + FieldCapabilitiesNodeResponse response = new FieldCapabilitiesNodeResponse(request.indices(), + indexResponses, indexFailures); + listener.onResponse(response); + } + } } From ae9db42d0719f2e69ba3d45ff6d8c21f9b3734df Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 30 Aug 2021 16:37:47 -0700 Subject: [PATCH 4/8] Refactor collection to handle multiple responses per index --- .../fieldcaps/FieldCapabilitiesFailure.java | 5 -- .../TransportFieldCapabilitiesAction.java | 73 +++++++++++-------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java index 9015f1de18e0e..9273cbfb5fd06 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java @@ -101,9 +101,4 @@ FieldCapabilitiesFailure addIndex(String index) { this.indices.add(index); return this; } - - FieldCapabilitiesFailure addIndices(List indices) { - this.indices.addAll(indices); - return this; - } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 317fc084aecde..785eadf85013b 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -107,7 +107,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti checkIndexBlocks(clusterState, concreteIndices); - final List indexResponses = Collections.synchronizedList(new ArrayList<>()); + final Map indexResponses = Collections.synchronizedMap(new HashMap<>()); final FailureCollector indexFailures = new FailureCollector(); // If all nodes are on version 7.16 or higher, then we group the shard requests and send a single request per node. @@ -143,7 +143,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti public void onResponse(FieldCapabilitiesNodeResponse response) { for (FieldCapabilitiesIndexResponse indexResponse : response.getIndexResponses()) { if (indexResponse.canMatch()) { - indexResponses.add(indexResponse); + indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse); } } @@ -173,7 +173,7 @@ public void onFailure(Exception e) { @Override public void onResponse(FieldCapabilitiesIndexResponse result) { if (result.canMatch()) { - indexResponses.add(result); + indexResponses.putIfAbsent(result.getIndexName(), result); } countDown.run(); } @@ -199,13 +199,9 @@ public void onFailure(Exception e) { FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { - indexResponses.add( - new FieldCapabilitiesIndexResponse( - RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()), - resp.get(), - resp.canMatch() - ) - ); + String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); + indexResponses.putIfAbsent(indexName, + new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch())); } for (FieldCapabilitiesFailure failure : response.getFailures()) { Exception ex = failure.getException(); @@ -229,12 +225,12 @@ private void checkIndexBlocks(ClusterState clusterState, String[] concreteIndice private Runnable createResponseMerger(FieldCapabilitiesRequest request, CountDown completionCounter, - List indexResponses, + Map indexResponses, FailureCollector indexFailures, ActionListener listener) { return () -> { if (completionCounter.countDown()) { - List failures = indexFailures.values(); + List failures = indexFailures.build(indexResponses.keySet()); if (indexResponses.size() > 0) { if (request.isMergeResults()) { // fork off to the management pool for merging the responses as the operation can run for longer than is acceptable @@ -245,11 +241,13 @@ private Runnable createResponseMerger(FieldCapabilitiesRequest request, () -> merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures))) ); } else { - listener.onResponse(new FieldCapabilitiesResponse(indexResponses, new ArrayList<>(failures))); + listener.onResponse(new FieldCapabilitiesResponse( + new ArrayList<>(indexResponses.values()), + new ArrayList<>(failures))); } } else { // we have no responses at all, maybe because of errors - if (indexFailures.size() > 0) { + if (indexFailures.isEmpty() == false) { // throw back the first exception listener.onFailure(failures.iterator().next().getException()); } else { @@ -326,13 +324,13 @@ private Map> groupShardsByNode(ClusterState clusterState, } private FieldCapabilitiesResponse merge( - List indexResponses, + Map indexResponses, boolean includeUnmapped, List failures ) { - String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).sorted().toArray(String[]::new); + String[] indices = indexResponses.keySet().stream().sorted().toArray(String[]::new); final Map> responseMapBuilder = new HashMap<>(); - for (FieldCapabilitiesIndexResponse response : indexResponses) { + for (FieldCapabilitiesIndexResponse response : indexResponses.values()) { innerMerge(responseMapBuilder, response); } final Map> responseMap = new HashMap<>(); @@ -379,24 +377,35 @@ private void innerMerge(Map> resp } } + /** + * Collects failures from all the individual index requests, then builds a failure list grouped by the underlying cause. + * + * This collector can contain a failure for an index even if one of its shards was successful. When building the final + * list, these failures will be skipped because they have no affect on the final response. + */ private static final class FailureCollector { - final Map, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap( - new HashMap<>() - ); - - List values() { + private final Map failuresByIndex = Collections.synchronizedMap(new HashMap<>()); + + List build(Set successfulIndices) { + Map, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap(new HashMap<>()); + for (Map.Entry failure : failuresByIndex.entrySet()) { + String index = failure.getKey(); + Exception e = failure.getValue(); + + if (successfulIndices.contains(index) == false) { + // we deduplicate exceptions on the underlying causes message and classname + // we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same + Throwable cause = ExceptionsHelper.unwrapCause(e); + Tuple groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName()); + indexFailures.compute(groupingKey, + (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[]{index}, e) : v.addIndex(index)); + } + } return new ArrayList<>(indexFailures.values()); } void collect(Exception e, String index) { - // we deduplicate exceptions on the underlying causes message and classname - // we unwrap the cause to e.g. group RemoteTransportexceptions coming from different nodes if the cause is the same - Throwable cause = ExceptionsHelper.unwrapCause(e); - Tuple groupingKey = new Tuple(cause.getMessage(), cause.getClass().getName()); - indexFailures.compute( - groupingKey, - (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] {index}, e) : v.addIndex(index) - ); + failuresByIndex.putIfAbsent(index, e); } void collect(Exception e, String[] indices) { @@ -411,8 +420,8 @@ void collectRemoteException(Exception ex, String clusterAlias, String[] remoteIn } } - int size() { - return this.indexFailures.size(); + boolean isEmpty() { + return failuresByIndex.isEmpty(); } } From 2ce1d685a5b77aa6a002c9dc758c88130dc128f4 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Mon, 30 Aug 2021 16:56:52 -0700 Subject: [PATCH 5/8] Also support grouping by node with index_filter --- .../search/fieldcaps/FieldCapabilitiesIT.java | 8 +++--- .../TransportFieldCapabilitiesAction.java | 25 +++++++++++++------ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index 15925aadf3bc1..5e6909c70b877 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -17,9 +17,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MetadataFieldMapper; -import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -29,7 +29,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.transport.RemoteTransportException; import org.junit.Before; import java.io.IOException; @@ -298,9 +297,8 @@ public void testFailures() throws InterruptedException { assertEquals(2, response.getFailedIndices().length); assertThat(response.getFailures().get(0).getIndices(), arrayContainingInAnyOrder("index1-error", "index2-error")); Exception failure = response.getFailures().get(0).getException(); - assertEquals(RemoteTransportException.class, failure.getClass()); - assertEquals(IllegalArgumentException.class, failure.getCause().getClass()); - assertEquals("I throw because I choose to.", failure.getCause().getMessage()); + assertEquals(IllegalArgumentException.class, failure.getClass()); + assertEquals("I throw because I choose to.", failure.getMessage()); // the "indices" section should not include failed ones assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder("old_index", "new_index")); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 785eadf85013b..0efe12adba73a 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -114,8 +114,8 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti // Otherwise, for backwards compatibility we follow the old strategy of sending a separate request per shard. final Map> shardsByNode; final CountDown completionCounter; - if (clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_16_0) && request.indexFilter() == null) { - shardsByNode = groupShardsByNode(clusterState, concreteIndices, indexFailures); + if (clusterState.getNodes().getMinNodeVersion().onOrAfter(Version.V_7_16_0)) { + shardsByNode = groupShardsByNode(request, clusterState, concreteIndices, indexFailures); completionCounter = new CountDown(shardsByNode.size() + remoteClusterIndices.size()); } else { shardsByNode = null; @@ -145,7 +145,6 @@ public void onResponse(FieldCapabilitiesNodeResponse response) { if (indexResponse.canMatch()) { indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse); } - } for (FieldCapabilitiesFailure indexFailure : response.getFailures()) { indexFailures.collect(indexFailure.getException(), indexFailure.getIndices()); @@ -289,9 +288,13 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(FieldCapabilitiesRe return remoteRequest; } - private Map> groupShardsByNode(ClusterState clusterState, + private Map> groupShardsByNode(FieldCapabilitiesRequest request, + ClusterState clusterState, String[] concreteIndices, FailureCollector indexFailures) { + // If an index filter is specified, then we must reach out to all of an index's shards to check + // if one of them can match. Otherwise, for efficiency we just reach out to one of its shards. + boolean includeAllShards = request.indexFilter() != null; Map> shardsByNodeId = new HashMap<>(); for (String indexName : concreteIndices) { GroupShardsIterator shards = clusterService.operationRouting() @@ -310,11 +313,20 @@ private Map> groupShardsByNode(ClusterState clusterState, String nodeId = selectedCopy.currentNodeId(); List shardGroup = shardsByNodeId.computeIfAbsent(nodeId, key -> new ArrayList<>()); shardGroup.add(selectedCopy.shardId()); - break; + if (includeAllShards == false) { + // We only need one shard for this index, so stop early + break; + } + } else if (includeAllShards) { + // We need all index shards but couldn't find a shard copy + Exception e = new NoShardAvailableActionException(shardCopies.shardId(), + LoggerMessageFormat.format("No shard available for index [{}]", indexName)); + indexFailures.collect(e, indexName); } } - if (selectedCopy == null) { + if (includeAllShards == false && selectedCopy == null) { + // We only needed one shard for the index but couldn't find any shard copy Exception e = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for index [{}]", indexName)); indexFailures.collect(e, indexName); @@ -346,7 +358,6 @@ private FieldCapabilitiesResponse merge( } responseMap.put(entry.getKey(), Collections.unmodifiableMap(typeMap)); } - // de-dup failures return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(responseMap), failures); } From 8b6b9776eb4e5d2472c0af666c8bb1dde3d0eab7 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Tue, 31 Aug 2021 12:54:56 -0700 Subject: [PATCH 6/8] Fix test failures --- .../java/org/elasticsearch/action/IndicesRequestIT.java | 2 +- .../search/fieldcaps/CCSFieldCapabilitiesIT.java | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java index 444f45144fbb3..e31eb7dfb70e1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java @@ -179,7 +179,7 @@ public void testGetFieldMappings() { } public void testFieldCapabilities() { - String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[index][s]"; + String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[n]"; interceptTransportActions(fieldCapabilitiesShardAction); FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java index 46ffeb98ddf65..899a0b4f222ab 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java @@ -102,9 +102,7 @@ public void testFailuresFromRemote() { .filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:" + remoteErrorIndex)) .findFirst().get(); ex = failure.getException(); - assertEquals(RemoteTransportException.class, ex.getClass()); - cause = ExceptionsHelper.unwrapCause(ex); - assertEquals(IllegalArgumentException.class, cause.getClass()); - assertEquals("I throw because I choose to.", cause.getMessage()); + assertEquals(IllegalArgumentException.class, ex.getClass()); + assertEquals("I throw because I choose to.", ex.getMessage()); } } From 5d95c5c280a8d2cb535321ccb3be57dc71559d38 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Thu, 23 Sep 2021 18:03:25 -0700 Subject: [PATCH 7/8] Return error when node is no longer available --- .../TransportFieldCapabilitiesAction.java | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 0efe12adba73a..346c363bd89d4 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -133,34 +134,38 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti String nodeId = entry.getKey(); List shardIds = entry.getValue(); - DiscoveryNode node = clusterState.getNodes().get(nodeId); - assert node != null; - - FieldCapabilitiesNodeRequest nodeRequest = prepareLocalNodeRequest(request, shardIds, localIndices, nowInMillis); - transportService.sendRequest(node, ACTION_NODE_NAME, nodeRequest, new ActionListenerResponseHandler<>( - new ActionListener() { - @Override - public void onResponse(FieldCapabilitiesNodeResponse response) { - for (FieldCapabilitiesIndexResponse indexResponse : response.getIndexResponses()) { - if (indexResponse.canMatch()) { - indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse); - } - } - for (FieldCapabilitiesFailure indexFailure : response.getFailures()) { - indexFailures.collect(indexFailure.getException(), indexFailure.getIndices()); + ActionListener nodeListener = new ActionListener() { + @Override + public void onResponse(FieldCapabilitiesNodeResponse response) { + for (FieldCapabilitiesIndexResponse indexResponse : response.getIndexResponses()) { + if (indexResponse.canMatch()) { + indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse); } - countDown.run(); } + for (FieldCapabilitiesFailure indexFailure : response.getFailures()) { + indexFailures.collect(indexFailure.getException(), indexFailure.getIndices()); + } + countDown.run(); + } - @Override - public void onFailure(Exception e) { - for (ShardId shardId : shardIds) { - indexFailures.collect(e, shardId.getIndexName()); - } - countDown.run(); + @Override + public void onFailure(Exception e) { + for (ShardId shardId : shardIds) { + indexFailures.collect(e, shardId.getIndexName()); } - }, - FieldCapabilitiesNodeResponse::new)); + countDown.run(); + } + }; + + DiscoveryNode node = clusterState.getNodes().get(nodeId); + if (node == null) { + nodeListener.onFailure(new NoNodeAvailableException("node [" + nodeId + "] is not available")); + } else { + FieldCapabilitiesNodeRequest nodeRequest = prepareLocalNodeRequest( + request, shardIds, localIndices, nowInMillis); + transportService.sendRequest(node, ACTION_NODE_NAME, nodeRequest, new ActionListenerResponseHandler<>( + nodeListener, FieldCapabilitiesNodeResponse::new)); + } } } else { for (String index : concreteIndices) { From 4efd602076c3b3094b33b3b4ba46c1cab7ffa280 Mon Sep 17 00:00:00 2001 From: Julie Tibshirani Date: Fri, 24 Sep 2021 18:59:08 -0700 Subject: [PATCH 8/8] Skip over shard if we already found a match for that index --- .../TransportFieldCapabilitiesAction.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 346c363bd89d4..bb55a1769efde 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -451,16 +451,24 @@ public void messageReceived(final FieldCapabilitiesNodeRequest request, List indexResponses = new ArrayList<>(); List indexFailures = new ArrayList<>(); + // If the request has an index filter, it may contain several shards belonging to the same + // index. We make sure to skip over a shard if we already found a match for that index. + Set matchedIndices = new HashSet<>(); for (ShardId shardId : request.shardIds()) { - FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(request.fields(), shardId.getIndexName(), - request.originalIndices(), request.indexFilter(), request.nowInMillis(), request.runtimeFields()); - indexRequest.shardId(shardId); - try { - FieldCapabilitiesIndexResponse indexResponse = fieldCapabilitiesFetcher.fetch(indexRequest); - indexResponses.add(indexResponse); - } catch (Exception e) { - FieldCapabilitiesFailure failure = new FieldCapabilitiesFailure(new String[] {shardId.getIndexName()}, e); - indexFailures.add(failure); + if (matchedIndices.contains(shardId.getIndexName()) == false) { + FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(request.fields(), shardId.getIndexName(), + request.originalIndices(), request.indexFilter(), request.nowInMillis(), request.runtimeFields()); + indexRequest.shardId(shardId); + try { + FieldCapabilitiesIndexResponse indexResponse = fieldCapabilitiesFetcher.fetch(indexRequest); + indexResponses.add(indexResponse); + if (indexResponse.canMatch()) { + matchedIndices.add(shardId.getIndexName()); + } + } catch (Exception e) { + FieldCapabilitiesFailure failure = new FieldCapabilitiesFailure(new String[]{shardId.getIndexName()}, e); + indexFailures.add(failure); + } } }