diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java index 5ebc70b225229..a36cce4c2a3f3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java @@ -179,7 +179,7 @@ public void testGetFieldMappings() { } public void testFieldCapabilities() { - String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[index][s]"; + String fieldCapabilitiesShardAction = FieldCapabilitiesAction.NAME + "[n]"; interceptTransportActions(fieldCapabilitiesShardAction); FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java index 46ffeb98ddf65..899a0b4f222ab 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/CCSFieldCapabilitiesIT.java @@ -102,9 +102,7 @@ public void testFailuresFromRemote() { .filter(f -> Arrays.asList(f.getIndices()).contains("remote_cluster:" + remoteErrorIndex)) .findFirst().get(); ex = failure.getException(); - assertEquals(RemoteTransportException.class, ex.getClass()); - cause = ExceptionsHelper.unwrapCause(ex); - assertEquals(IllegalArgumentException.class, cause.getClass()); - assertEquals("I throw because I choose to.", cause.getMessage()); + assertEquals(IllegalArgumentException.class, ex.getClass()); + assertEquals("I throw because I choose to.", ex.getMessage()); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java index b522780c49daf..4531e56268d92 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/fieldcaps/FieldCapabilitiesIT.java @@ -10,16 +10,34 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.fieldcaps.FieldCapabilities; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.mapper.DocumentParserContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MetadataFieldMapper; -import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.query.AbstractQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -29,7 +47,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.transport.RemoteTransportException; import org.junit.Before; import java.io.IOException; @@ -40,16 +57,28 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Predicate; import static java.util.Collections.singletonList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; public class FieldCapabilitiesIT extends ESIntegTestCase { + @Override + protected Collection> getMockPlugins() { + final Collection> plugins = new ArrayList<>(super.getMockPlugins()); + plugins.add(MockTransportService.TestPlugin.class); + return plugins; + } + @Override @Before public void setUp() throws Exception { @@ -298,9 +327,8 @@ public void testFailures() throws InterruptedException { assertEquals(2, response.getFailedIndices().length); assertThat(response.getFailures().get(0).getIndices(), arrayContainingInAnyOrder("index1-error", "index2-error")); Exception failure = response.getFailures().get(0).getException(); - assertEquals(RemoteTransportException.class, failure.getClass()); - assertEquals(IllegalArgumentException.class, failure.getCause().getClass()); - assertEquals("I throw because I choose to.", failure.getCause().getMessage()); + assertEquals(IllegalArgumentException.class, failure.getClass()); + assertEquals("I throw because I choose to.", failure.getMessage()); // the "indices" section should not include failed ones assertThat(Arrays.asList(response.getIndices()), containsInAnyOrder("old_index", "new_index")); @@ -315,6 +343,163 @@ public void testFailures() throws InterruptedException { assertEquals("I throw because I choose to.", ex.getMessage()); } + private void populateTimeRangeIndices() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + assertAcked(prepareCreate("log-index-1") + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword")); + assertAcked(prepareCreate("log-index-2") + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .addMapping("_doc", "timestamp", "type=date", "field1", "type=long")); + List reqs = new ArrayList<>(); + reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2015-07-08")); + reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2018-07-08")); + reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-03-03")); + reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-09-09")); + reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2019-10-12")); + reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-02-02")); + reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-10-10")); + indexRandom(true, reqs); + ensureGreen("log-index-1", "log-index-2"); + client().admin().indices().prepareRefresh("log-index-1", "log-index-2").get(); + } + + public void testTargetNodeFails() throws Exception { + populateTimeRangeIndices(); + try { + final AtomicBoolean failedRequest = new AtomicBoolean(); + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME, + (handler, request, channel, task) -> { + if (failedRequest.compareAndSet(false, true)) { + channel.sendResponse(new CircuitBreakingException("Simulated", CircuitBreaker.Durability.TRANSIENT)); + } else { + handler.messageReceived(request, channel, task); + } + }); + } + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("log-index-*"); + request.fields("*"); + if (randomBoolean()) { + request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01")); + } + final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet(); + assertTrue(failedRequest.get()); + assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2")); + assertThat(response.getField("field1"), aMapWithSize(2)); + assertThat(response.getField("field1"), hasKey("long")); + assertThat(response.getField("field1"), hasKey("keyword")); + } finally { + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.clearAllRules(); + } + } + } + + public void testNoActiveCopy() throws Exception { + assertAcked(prepareCreate("log-index-inactive") + .setSettings(Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.require._id", "unknown")) + .setWaitForActiveShards(ActiveShardCount.NONE) + .addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword")); + { + final ElasticsearchException ex = + expectThrows(ElasticsearchException.class, () -> client().prepareFieldCaps("log-index-*").setFields("*").get()); + assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy")); + } + { + populateTimeRangeIndices(); + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("log-index-*"); + request.fields("*"); + if (randomBoolean()) { + request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01")); + } + final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet(); + assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2")); + assertThat(response.getField("field1"), aMapWithSize(2)); + assertThat(response.getField("field1"), hasKey("long")); + assertThat(response.getField("field1"), hasKey("long")); + + assertThat(response.getFailures(), hasSize(1)); + final FieldCapabilitiesFailure failure = response.getFailures().get(0); + assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive")); + assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy")); + } + } + + private void moveOrCloseShardsOnNodes(String nodeName) throws Exception { + final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (randomBoolean()) { + indexShard.close("test", randomBoolean()); + } else if (randomBoolean()) { + final ShardId shardId = indexShard.shardId(); + final String[] nodeNames = internalCluster().getNodeNames(); + final String newNodeName = randomValueOtherThanMany(n -> nodeName.equals(n) == false, () -> randomFrom(nodeNames)); + DiscoveryNode fromNode = null; + DiscoveryNode toNode = null; + for (DiscoveryNode node : clusterService().state().nodes()) { + if (node.getName().equals(nodeName)) { + fromNode = node; + } + if (node.getName().equals(newNodeName)) { + toNode = node; + } + } + assertNotNull(fromNode); + assertNotNull(toNode); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand(shardId.getIndexName(), shardId.id(), fromNode.getId(), toNode.getId())) + .execute().actionGet(); + } + } + } + } + + public void testRelocation() throws Exception { + populateTimeRangeIndices(); + try { + final AtomicBoolean relocated = new AtomicBoolean(); + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME, + (handler, request, channel, task) -> { + if (relocated.compareAndSet(false, true)) { + moveOrCloseShardsOnNodes(node); + } + handler.messageReceived(request, channel, task); + }); + } + FieldCapabilitiesRequest request = new FieldCapabilitiesRequest(); + request.indices("log-index-*"); + request.fields("*"); + if (randomBoolean()) { + request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01")); + } + final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet(); + assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2")); + assertThat(response.getField("field1"), aMapWithSize(2)); + assertThat(response.getField("field1"), hasKey("long")); + assertThat(response.getField("field1"), hasKey("long")); + } finally { + for (String node : internalCluster().getNodeNames()) { + MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); + transportService.clearAllRules(); + } + } + } + private void assertIndices(FieldCapabilitiesResponse response, String... indices) { assertNotNull(response.getIndices()); Arrays.sort(indices); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 4e2b70b74ebf3..5f2085a30ef6e 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -205,7 +205,6 @@ import org.elasticsearch.action.explain.TransportExplainAction; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction; import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; -import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesIndexAction; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.get.MultiGetAction; import org.elasticsearch.action.get.TransportGetAction; @@ -635,8 +634,7 @@ public void reg actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class); actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class); - actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class, - TransportFieldCapabilitiesIndexAction.class); + actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class); actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/OriginalIndices.java b/server/src/main/java/org/elasticsearch/action/OriginalIndices.java index 7b2f03287bb7a..82627211fd7b4 100644 --- a/server/src/main/java/org/elasticsearch/action/OriginalIndices.java +++ b/server/src/main/java/org/elasticsearch/action/OriginalIndices.java @@ -14,6 +14,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Objects; /** * Used to keep track of original indices within internal (e.g. shard level) requests @@ -67,4 +68,19 @@ public String toString() { ", indicesOptions=" + indicesOptions + '}'; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OriginalIndices that = (OriginalIndices) o; + return Arrays.equals(indices, that.indices) && Objects.equals(indicesOptions, that.indicesOptions); + } + + @Override + public int hashCode() { + int result = Objects.hash(indicesOptions); + result = 31 * result + Arrays.hashCode(indices); + return result; + } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java index ec7cbea867c21..e3d9e5d9498a5 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFailure.java @@ -101,9 +101,4 @@ FieldCapabilitiesFailure addIndex(String index) { this.indices.add(index); return this; } - - FieldCapabilitiesFailure addIndices(List indices) { - this.indices.addAll(indices); - return this; - } } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java new file mode 100644 index 0000000000000..6d6b542d1c957 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesFetcher.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.ObjectMapper; +import org.elasticsearch.index.mapper.RuntimeField; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.SearchExecutionContext; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.ShardSearchRequest; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; + +/** + * Loads the mappings for an index and computes all {@link IndexFieldCapabilities}. This + * helper class performs the core shard operation for the field capabilities action. + */ +class FieldCapabilitiesFetcher { + private final IndicesService indicesService; + + FieldCapabilitiesFetcher(IndicesService indicesService) { + this.indicesService = indicesService; + } + + public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest request) throws IOException { + final ShardId shardId = request.shardId(); + final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(request.shardId().getId()); + try (Engine.Searcher searcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { + + final SearchExecutionContext searchExecutionContext = indexService.newSearchExecutionContext(shardId.id(), 0, + searcher, request::nowInMillis, null, request.runtimeFields()); + + if (canMatchShard(request, searchExecutionContext) == false) { + return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false); + } + + Set fieldNames = new HashSet<>(); + for (String pattern : request.fields()) { + fieldNames.addAll(searchExecutionContext.getMatchingFieldNames(pattern)); + } + + Predicate fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName()); + Map responseMap = new HashMap<>(); + for (String field : fieldNames) { + MappedFieldType ft = searchExecutionContext.getFieldType(field); + boolean isMetadataField = searchExecutionContext.isMetadataField(field); + if (isMetadataField || fieldPredicate.test(ft.name())) { + IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(field, + ft.familyTypeName(), isMetadataField, ft.isSearchable(), ft.isAggregatable(), ft.meta()); + responseMap.put(field, fieldCap); + } else { + continue; + } + + // Check the ancestor of the field to find nested and object fields. + // Runtime fields are excluded since they can override any path. + //TODO find a way to do this that does not require an instanceof check + if (ft instanceof RuntimeField == false) { + int dotIndex = ft.name().lastIndexOf('.'); + while (dotIndex > -1) { + String parentField = ft.name().substring(0, dotIndex); + if (responseMap.containsKey(parentField)) { + // we added this path on another field already + break; + } + // checks if the parent field contains sub-fields + if (searchExecutionContext.getFieldType(parentField) == null) { + // no field type, it must be an object field + ObjectMapper mapper = searchExecutionContext.getObjectMapper(parentField); + // Composite runtime fields do not have a mapped type for the root - check for null + if (mapper != null) { + String type = mapper.isNested() ? "nested" : "object"; + IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(parentField, type, + false, false, false, Collections.emptyMap()); + responseMap.put(parentField, fieldCap); + } + } + dotIndex = parentField.lastIndexOf('.'); + } + } + } + return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true); + } + } + + private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecutionContext searchExecutionContext) throws IOException { + if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) { + return true; + } + assert req.nowInMillis() != 0L; + ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), null, req.nowInMillis(), AliasFilter.EMPTY); + searchRequest.source(new SearchSourceBuilder().query(req.indexFilter())); + return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext); + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequest.java index 0ac89d500a947..7eb9028404c64 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequest.java @@ -22,26 +22,25 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; -import java.util.Objects; public class FieldCapabilitiesIndexRequest extends ActionRequest implements IndicesRequest { public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); - private final String index; private final String[] fields; private final OriginalIndices originalIndices; private final QueryBuilder indexFilter; private final long nowInMillis; private final Map runtimeFields; - - private ShardId shardId; + private final ShardId shardId; // For serialization FieldCapabilitiesIndexRequest(StreamInput in) throws IOException { super(in); shardId = in.readOptionalWriteable(ShardId::new); - index = in.readOptionalString(); + if (in.getVersion().before(Version.V_7_16_0)) { + in.readOptionalString(); // index + } fields = in.readStringArray(); if (in.getVersion().onOrAfter(Version.V_6_2_0)) { originalIndices = OriginalIndices.readOriginalIndices(in); @@ -54,7 +53,7 @@ public class FieldCapabilitiesIndexRequest extends ActionRequest implements Indi } FieldCapabilitiesIndexRequest(String[] fields, - String index, + ShardId shardId, OriginalIndices originalIndices, QueryBuilder indexFilter, long nowInMillis, @@ -62,7 +61,7 @@ public class FieldCapabilitiesIndexRequest extends ActionRequest implements Indi if (fields == null || fields.length == 0) { throw new IllegalArgumentException("specified fields can't be null or empty"); } - this.index = Objects.requireNonNull(index); + this.shardId = shardId; this.fields = fields; this.originalIndices = originalIndices; this.indexFilter = indexFilter; @@ -85,7 +84,7 @@ public IndicesOptions indicesOptions() { } public String index() { - return index; + return shardId.getIndexName(); } public QueryBuilder indexFilter() { @@ -104,16 +103,14 @@ public long nowInMillis() { return nowInMillis; } - FieldCapabilitiesIndexRequest shardId(ShardId shardId) { - this.shardId = shardId; - return this; - } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalWriteable(shardId); - out.writeOptionalString(index); + if (out.getVersion().before(Version.V_7_16_0)) { + out.writeOptionalString(shardId.getIndexName()); + } out.writeStringArray(fields); if (out.getVersion().onOrAfter(Version.V_6_2_0)) { OriginalIndices.writeOriginalIndices(originalIndices, out); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java index 919f131cd80cf..d25edeafe6071 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexResponse.java @@ -18,9 +18,6 @@ import java.util.Map; import java.util.Objects; -/** - * Response for {@link TransportFieldCapabilitiesIndexAction}. - */ public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable { private final String indexName; private final Map responseMap; diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java new file mode 100644 index 0000000000000..2d5d23421cc7a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequest.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +class FieldCapabilitiesNodeRequest extends ActionRequest implements IndicesRequest { + + private final List shardIds; + private final String[] fields; + private final OriginalIndices originalIndices; + private final QueryBuilder indexFilter; + private final long nowInMillis; + private final Map runtimeFields; + + FieldCapabilitiesNodeRequest(StreamInput in) throws IOException { + super(in); + shardIds = in.readList(ShardId::new); + fields = in.readStringArray(); + originalIndices = OriginalIndices.readOriginalIndices(in); + indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class); + nowInMillis = in.readLong(); + runtimeFields = in.readMap(); + } + + FieldCapabilitiesNodeRequest(List shardIds, + String[] fields, + OriginalIndices originalIndices, + QueryBuilder indexFilter, + long nowInMillis, + Map runtimeFields) { + this.shardIds = Objects.requireNonNull(shardIds); + this.fields = fields; + this.originalIndices = originalIndices; + this.indexFilter = indexFilter; + this.nowInMillis = nowInMillis; + this.runtimeFields = runtimeFields; + } + + public String[] fields() { + return fields; + } + + public OriginalIndices originalIndices() { + return originalIndices; + } + + @Override + public String[] indices() { + return originalIndices.indices(); + } + + @Override + public IndicesOptions indicesOptions() { + return originalIndices.indicesOptions(); + } + + public QueryBuilder indexFilter() { + return indexFilter; + } + + public Map runtimeFields() { + return runtimeFields; + } + + public List shardIds() { + return shardIds; + } + + public long nowInMillis() { + return nowInMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(shardIds); + out.writeStringArray(fields); + OriginalIndices.writeOriginalIndices(originalIndices, out); + out.writeOptionalNamedWriteable(indexFilter); + out.writeLong(nowInMillis); + out.writeMap(runtimeFields); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldCapabilitiesNodeRequest that = (FieldCapabilitiesNodeRequest) o; + return nowInMillis == that.nowInMillis && shardIds.equals(that.shardIds) + && Arrays.equals(fields, that.fields) && Objects.equals(originalIndices, that.originalIndices) + && Objects.equals(indexFilter, that.indexFilter) && Objects.equals(runtimeFields, that.runtimeFields); + } + + @Override + public int hashCode() { + int result = Objects.hash(originalIndices, indexFilter, nowInMillis, runtimeFields); + result = 31 * result + shardIds.hashCode(); + result = 31 * result + Arrays.hashCode(fields); + return result; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java new file mode 100644 index 0000000000000..2cb3dda8bb9a3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponse.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +class FieldCapabilitiesNodeResponse extends ActionResponse implements Writeable { + private final List indexResponses; + private final Map failures; + private final Set unmatchedShardIds; + + FieldCapabilitiesNodeResponse(List indexResponses, + Map failures, + Set unmatchedShardIds) { + this.indexResponses = Objects.requireNonNull(indexResponses); + this.failures = Objects.requireNonNull(failures); + this.unmatchedShardIds = Objects.requireNonNull(unmatchedShardIds); + } + + FieldCapabilitiesNodeResponse(StreamInput in) throws IOException { + super(in); + this.indexResponses = in.readList(FieldCapabilitiesIndexResponse::new); + this.failures = in.readMap(ShardId::new, StreamInput::readException); + this.unmatchedShardIds = in.readSet(ShardId::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(indexResponses); + out.writeMap(failures, (o, v) -> v.writeTo(o), StreamOutput::writeException); + out.writeCollection(unmatchedShardIds); + } + + public Map getFailures() { + return failures; + } + + public List getIndexResponses() { + return indexResponses; + } + + public Set getUnmatchedShardIds() { + return unmatchedShardIds; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FieldCapabilitiesNodeResponse that = (FieldCapabilitiesNodeResponse) o; + return Objects.equals(indexResponses, that.indexResponses) && Objects.equals(failures, that.failures) + && unmatchedShardIds.equals(that.unmatchedShardIds); + } + + @Override + public int hashCode() { + return Objects.hash(indexResponses, failures, unmatchedShardIds); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java new file mode 100644 index 0000000000000..057c405ae78af --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/RequestDispatcher.java @@ -0,0 +1,327 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.NoShardAvailableActionException; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +/** + * Dispatches child field-caps requests to old/new data nodes in the local cluster that have shards of the requesting indices. + */ +final class RequestDispatcher { + + static final Version GROUP_REQUESTS_VERSION = Version.V_7_16_0; + + static final Logger LOGGER = LogManager.getLogger(RequestDispatcher.class); + + private final TransportService transportService; + private final ClusterState clusterState; + private final FieldCapabilitiesRequest fieldCapsRequest; + private final Task parentTask; + private final OriginalIndices originalIndices; + private final long nowInMillis; + + private final boolean hasFilter; + private final Executor executor; + private final Consumer onIndexResponse; + private final BiConsumer onIndexFailure; + private final Runnable onComplete; + + private final AtomicInteger pendingRequests = new AtomicInteger(); + private final AtomicInteger executionRound = new AtomicInteger(); + private final Map indexSelectors; + + RequestDispatcher(ClusterService clusterService, TransportService transportService, Task parentTask, + FieldCapabilitiesRequest fieldCapsRequest, OriginalIndices originalIndices, long nowInMillis, + String[] indices, Executor executor, Consumer onIndexResponse, + BiConsumer onIndexFailure, Runnable onComplete) { + this.transportService = transportService; + this.fieldCapsRequest = fieldCapsRequest; + this.parentTask = parentTask; + this.originalIndices = originalIndices; + this.nowInMillis = nowInMillis; + this.clusterState = clusterService.state(); + this.hasFilter = + fieldCapsRequest.indexFilter() != null && fieldCapsRequest.indexFilter() instanceof MatchAllQueryBuilder == false; + this.executor = executor; + this.onIndexResponse = onIndexResponse; + this.onIndexFailure = onIndexFailure; + this.onComplete = new RunOnce(onComplete); + this.indexSelectors = ConcurrentCollections.newConcurrentMap(); + for (String index : indices) { + final GroupShardsIterator shardIts = + clusterService.operationRouting().searchShards(clusterState, new String[]{index}, null, null, null, null); + final IndexSelector indexResult = new IndexSelector(shardIts); + if (indexResult.nodeToShards.isEmpty()) { + onIndexFailure.accept(index, new NoShardAvailableActionException(null, "index [" + index + "] has no active shard copy")); + } else { + this.indexSelectors.put(index, indexResult); + } + } + } + + void execute() { + executor.execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + // If we get rejected, mark pending indices as failed and complete + final List failedIndices = new ArrayList<>(indexSelectors.keySet()); + for (String failedIndex : failedIndices) { + final IndexSelector removed = indexSelectors.remove(failedIndex); + assert removed != null; + onIndexFailure.accept(failedIndex, e); + } + onComplete.run(); + } + + @Override + protected void doRun() { + innerExecute(); + } + }); + } + + private void innerExecute() { + final Map> nodeToSelectedShards = new HashMap<>(); + assert pendingRequests.get() == 0 : "pending requests = " + pendingRequests; + final List failedIndices = new ArrayList<>(); + for (Map.Entry e : indexSelectors.entrySet()) { + final String index = e.getKey(); + final IndexSelector indexSelector = e.getValue(); + final List selectedShards = indexSelector.nextTarget(clusterState.nodes(), hasFilter); + if (selectedShards.isEmpty()) { + failedIndices.add(index); + } else { + pendingRequests.addAndGet(selectedShards.size()); + for (ShardRouting shard : selectedShards) { + nodeToSelectedShards.computeIfAbsent(shard.currentNodeId(), n -> new ArrayList<>()).add(shard.shardId()); + } + } + } + for (String failedIndex : failedIndices) { + final IndexSelector indexSelector = indexSelectors.remove(failedIndex); + assert indexSelector != null; + final Exception failure = indexSelector.getFailure(); + if (failure != null) { + onIndexFailure.accept(failedIndex, failure); + } + } + if (nodeToSelectedShards.isEmpty()) { + onComplete.run(); + } else { + for (Map.Entry> e : nodeToSelectedShards.entrySet()) { + sendRequestToNode(e.getKey(), e.getValue()); + } + } + } + + // for testing + int executionRound() { + return executionRound.get(); + } + + private void sendRequestToNode(String nodeId, List shardIds) { + final DiscoveryNode node = clusterState.nodes().get(nodeId); + assert node != null; + if (node.getVersion().onOrAfter(GROUP_REQUESTS_VERSION)) { + LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds); + final ActionListener listener = + ActionListener.wrap(r -> onRequestResponse(shardIds, r), failure -> onRequestFailure(shardIds, failure)); + final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest( + shardIds, + fieldCapsRequest.fields(), + originalIndices, + fieldCapsRequest.indexFilter(), + nowInMillis, + fieldCapsRequest.runtimeFields()); + transportService.sendChildRequest(node, TransportFieldCapabilitiesAction.ACTION_NODE_NAME, nodeRequest, parentTask, + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new)); + } else { + for (ShardId shardId : shardIds) { + LOGGER.debug("round {} sends field caps shard request to node {} for shardId {}", executionRound, node, shardId); + final ActionListener listener = ActionListener.wrap( + r -> { + final FieldCapabilitiesNodeResponse nodeResponse; + if (r.canMatch()) { + nodeResponse = new FieldCapabilitiesNodeResponse( + Collections.singletonList(r), Collections.emptyMap(), Collections.emptySet()); + } else { + nodeResponse = new FieldCapabilitiesNodeResponse(Collections.emptyList(), Collections.emptyMap(), + Collections.singleton(shardId)); + } + onRequestResponse(Collections.singletonList(shardId), nodeResponse); + }, + e -> onRequestFailure(Collections.singletonList(shardId), e)); + final FieldCapabilitiesIndexRequest shardRequest = new FieldCapabilitiesIndexRequest(fieldCapsRequest.fields(), shardId, + originalIndices, fieldCapsRequest.indexFilter(), nowInMillis, fieldCapsRequest.runtimeFields()); + transportService.sendChildRequest(node, TransportFieldCapabilitiesAction.ACTION_SHARD_NAME, shardRequest, parentTask, + TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, FieldCapabilitiesIndexResponse::new)); + } + } + } + + private void afterRequestsCompleted(int numRequests) { + if (pendingRequests.addAndGet(-numRequests) == 0) { + // Here we only retry after all pending requests have responded to avoid exploding network requests + // when the cluster is unstable or overloaded as an eager retry approach can add more load to the cluster. + executionRound.incrementAndGet(); + execute(); + } + } + + private void onRequestResponse(List shardIds, FieldCapabilitiesNodeResponse nodeResponse) { + for (FieldCapabilitiesIndexResponse indexResponse : nodeResponse.getIndexResponses()) { + if (indexResponse.canMatch()) { + if (indexSelectors.remove(indexResponse.getIndexName()) != null) { + onIndexResponse.accept(indexResponse); + } + } + } + for (ShardId unmatchedShardId : nodeResponse.getUnmatchedShardIds()) { + final IndexSelector indexSelector = indexSelectors.get(unmatchedShardId.getIndexName()); + if (indexSelector != null) { + indexSelector.addUnmatchedShardId(unmatchedShardId); + } + } + for (Map.Entry e : nodeResponse.getFailures().entrySet()) { + final IndexSelector indexSelector = indexSelectors.get(e.getKey().getIndexName()); + if (indexSelector != null) { + indexSelector.setFailure(e.getKey(), e.getValue()); + } + } + afterRequestsCompleted(shardIds.size()); + } + + private void onRequestFailure(List shardIds, Exception e) { + for (ShardId shardId : shardIds) { + final IndexSelector indexSelector = indexSelectors.get(shardId.getIndexName()); + if (indexSelector != null) { + indexSelector.setFailure(shardId, e); + } + } + afterRequestsCompleted(shardIds.size()); + } + + private static class IndexSelector { + private final Map> nodeToShards = new HashMap<>(); + private final Set unmatchedShardIds = new HashSet<>(); + private final Map failures = new HashMap<>(); + + IndexSelector(GroupShardsIterator shardIts) { + for (ShardIterator shardIt : shardIts) { + for (ShardRouting shard : shardIt) { + nodeToShards.computeIfAbsent(shard.currentNodeId(), node -> new ArrayList<>()).add(shard); + } + } + } + + synchronized Exception getFailure() { + Exception first = null; + for (Exception e : failures.values()) { + first = ExceptionsHelper.useOrSuppress(first, e); + } + return first; + } + + synchronized void setFailure(ShardId shardId, Exception failure) { + assert unmatchedShardIds.contains(shardId) == false : "Shard " + shardId + " was unmatched already"; + failures.compute(shardId, (k, curr) -> ExceptionsHelper.useOrSuppress(curr, failure)); + } + + synchronized void addUnmatchedShardId(ShardId shardId) { + final boolean added = unmatchedShardIds.add(shardId); + assert added : "Shard " + shardId + " was unmatched already"; + failures.remove(shardId); + } + + synchronized List nextTarget(DiscoveryNodes discoveryNodes, boolean withQueryFilter) { + if (nodeToShards.isEmpty()) { + return Collections.emptyList(); + } + final Iterator>> nodeIt = nodeToShards.entrySet().iterator(); + if (withQueryFilter) { + // If an index filter is specified, then we must reach out to all of an index's shards to check + // if one of them can match. Otherwise, for efficiency we just reach out to one of its shards. + final List selectedShards = new ArrayList<>(); + final Set selectedShardIds = new HashSet<>(); + while (nodeIt.hasNext()) { + final List shards = nodeIt.next().getValue(); + final Iterator shardIt = shards.iterator(); + while (shardIt.hasNext()) { + final ShardRouting shard = shardIt.next(); + if (unmatchedShardIds.contains(shard.shardId())) { + shardIt.remove(); + continue; + } + if (selectedShardIds.add(shard.shardId())) { + shardIt.remove(); + selectedShards.add(shard); + } + } + if (shards.isEmpty()) { + nodeIt.remove(); + } + } + return selectedShards; + } else { + assert unmatchedShardIds.isEmpty(); + final Map.Entry> node = nodeIt.next(); + // If the target node is on the new version, then we can ask it to process all its copies in a single request + // and the target node will process at most one valid copy. Otherwise, we should ask for a single copy to avoid + // sending multiple requests. + final DiscoveryNode discoNode = discoveryNodes.get(node.getKey()); + if (discoNode.getVersion().onOrAfter(GROUP_REQUESTS_VERSION)) { + nodeIt.remove(); + return node.getValue(); + } else { + final List shards = node.getValue(); + final ShardRouting selectedShard = shards.remove(0); + if (shards.isEmpty()) { + nodeIt.remove(); + } + return Collections.singletonList(selectedShard); + } + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 728d2f39d2e59..917f3ca35de91 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -21,13 +22,16 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -39,12 +43,18 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; public class TransportFieldCapabilitiesAction extends HandledTransportAction { + public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]"; + public static final String ACTION_SHARD_NAME = FieldCapabilitiesAction.NAME + "[index][s]"; + private final ThreadPool threadPool; private final TransportService transportService; private final ClusterService clusterService; private final IndexNameExpressionResolver indexNameExpressionResolver; + + private final FieldCapabilitiesFetcher fieldCapabilitiesFetcher; private final Predicate metadataFieldPred; @Inject @@ -59,8 +69,15 @@ public TransportFieldCapabilitiesAction(TransportService transportService, this.transportService = transportService; this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; + + this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService); final Set metadataFields = indicesService.getAllMetadataFields(); this.metadataFieldPred = metadataFields::contains; + + transportService.registerRequestHandler(ACTION_NODE_NAME, ThreadPool.Names.MANAGEMENT, + FieldCapabilitiesNodeRequest::new, new NodeTransportHandler()); + transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME, + FieldCapabilitiesIndexRequest::new, new ShardTransportHandler()); } @Override @@ -80,47 +97,32 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); } - checkIndexBlocks(clusterState, concreteIndices); - - final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size(); - if (totalNumRequest == 0) { + if (concreteIndices.length == 0 && remoteClusterIndices.isEmpty()) { listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap())); return; } - final List indexResponses = Collections.synchronizedList(new ArrayList<>()); - final FailureCollector indexFailures = new FailureCollector(); - final Runnable countDown = createResponseMerger(request, totalNumRequest, indexResponses, indexFailures, listener); - - if (concreteIndices.length > 0) { - // fork this action to the management pool as it can fan out to a large number of child requests that get handled on SAME and - // thus would all run on the current transport thread and block it for an unacceptable amount of time - // (particularly with security enabled) - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(ActionRunnable.wrap(listener, l -> { - for (String index : concreteIndices) { - new TransportFieldCapabilitiesIndexAction.AsyncShardsAction( - transportService, - clusterService, - prepareLocalIndexRequest(request, index, localIndices, nowInMillis), - new ActionListener() { - @Override - public void onResponse(FieldCapabilitiesIndexResponse result) { - if (result.canMatch()) { - indexResponses.add(result); - } - countDown.run(); - } + checkIndexBlocks(clusterState, concreteIndices); - @Override - public void onFailure(Exception e) { - indexFailures.collect(e, index); - countDown.run(); - } - } - ).start(); - } - })); - } + final Map indexResponses = Collections.synchronizedMap(new HashMap<>()); + final FailureCollector indexFailures = new FailureCollector(); + // One for each cluster including the local cluster + final CountDown completionCounter = new CountDown(1 + remoteClusterIndices.size()); + final Runnable countDown = createResponseMerger(request, completionCounter, indexResponses, indexFailures, listener); + final RequestDispatcher requestDispatcher = new RequestDispatcher( + clusterService, + transportService, + task, + request, + localIndices, + nowInMillis, + concreteIndices, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + indexResponse -> indexResponses.putIfAbsent(indexResponse.getIndexName(), indexResponse), + indexFailures::collect, + countDown + ); + requestDispatcher.execute(); // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead // send us back all individual index results. @@ -131,13 +133,9 @@ public void onFailure(Exception e) { FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis); remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { - indexResponses.add( - new FieldCapabilitiesIndexResponse( - RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()), - resp.get(), - resp.canMatch() - ) - ); + String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()); + indexResponses.putIfAbsent(indexName, + new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch())); } for (FieldCapabilitiesFailure failure : response.getFailures()) { Exception ex = failure.getException(); @@ -160,14 +158,13 @@ private void checkIndexBlocks(ClusterState clusterState, String[] concreteIndice } private Runnable createResponseMerger(FieldCapabilitiesRequest request, - int totalNumRequests, - List indexResponses, + CountDown completionCounter, + Map indexResponses, FailureCollector indexFailures, ActionListener listener) { - final CountDown completionCounter = new CountDown(totalNumRequests); return () -> { if (completionCounter.countDown()) { - List failures = indexFailures.values(); + List failures = indexFailures.build(indexResponses.keySet()); if (indexResponses.size() > 0) { if (request.isMergeResults()) { // fork off to the management pool for merging the responses as the operation can run for longer than is acceptable @@ -178,11 +175,13 @@ private Runnable createResponseMerger(FieldCapabilitiesRequest request, () -> merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures))) ); } else { - listener.onResponse(new FieldCapabilitiesResponse(indexResponses, new ArrayList<>(failures))); + listener.onResponse(new FieldCapabilitiesResponse( + new ArrayList<>(indexResponses.values()), + new ArrayList<>(failures))); } } else { // we have no responses at all, maybe because of errors - if (indexFailures.size() > 0) { + if (indexFailures.isEmpty() == false) { // throw back the first exception listener.onFailure(failures.iterator().next().getException()); } else { @@ -193,14 +192,6 @@ private Runnable createResponseMerger(FieldCapabilitiesRequest request, }; } - private static FieldCapabilitiesIndexRequest prepareLocalIndexRequest(FieldCapabilitiesRequest request, - String index, - OriginalIndices originalIndices, - long nowInMillis) { - return new FieldCapabilitiesIndexRequest(request.fields(), index, originalIndices, - request.indexFilter(), nowInMillis, request.runtimeFields()); - } - private static FieldCapabilitiesRequest prepareRemoteRequest(FieldCapabilitiesRequest request, OriginalIndices originalIndices, long nowInMillis) { @@ -216,13 +207,13 @@ private static FieldCapabilitiesRequest prepareRemoteRequest(FieldCapabilitiesRe } private FieldCapabilitiesResponse merge( - List indexResponses, + Map indexResponses, boolean includeUnmapped, List failures ) { - String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).sorted().toArray(String[]::new); + String[] indices = indexResponses.keySet().stream().sorted().toArray(String[]::new); final Map> responseMapBuilder = new HashMap<>(); - for (FieldCapabilitiesIndexResponse response : indexResponses) { + for (FieldCapabilitiesIndexResponse response : indexResponses.values()) { innerMerge(responseMapBuilder, response); } final Map> responseMap = new HashMap<>(); @@ -238,7 +229,6 @@ private FieldCapabilitiesResponse merge( } responseMap.put(entry.getKey(), Collections.unmodifiableMap(typeMap)); } - // de-dup failures return new FieldCapabilitiesResponse(indices, Collections.unmodifiableMap(responseMap), failures); } @@ -269,34 +259,94 @@ private void innerMerge(Map> resp } } + /** + * Collects failures from all the individual index requests, then builds a failure list grouped by the underlying cause. + * + * This collector can contain a failure for an index even if one of its shards was successful. When building the final + * list, these failures will be skipped because they have no affect on the final response. + */ private static final class FailureCollector { - final Map, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap( - new HashMap<>() - ); + private final Map failuresByIndex = Collections.synchronizedMap(new HashMap<>()); - List values() { + List build(Set successfulIndices) { + Map, FieldCapabilitiesFailure> indexFailures = Collections.synchronizedMap(new HashMap<>()); + for (Map.Entry failure : failuresByIndex.entrySet()) { + String index = failure.getKey(); + Exception e = failure.getValue(); + + if (successfulIndices.contains(index) == false) { + // we deduplicate exceptions on the underlying causes message and classname + // we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same + Throwable cause = ExceptionsHelper.unwrapCause(e); + Tuple groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName()); + indexFailures.compute(groupingKey, + (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[]{index}, e) : v.addIndex(index)); + } + } return new ArrayList<>(indexFailures.values()); } - void collect(Exception e, String index) { - // we deduplicate exceptions on the underlying causes message and classname - // we unwrap the cause to e.g. group RemoteTransportexceptions coming from different nodes if the cause is the same - Throwable cause = ExceptionsHelper.unwrapCause(e); - Tuple groupingKey = new Tuple(cause.getMessage(), cause.getClass().getName()); - indexFailures.compute( - groupingKey, - (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] {index}, e) : v.addIndex(index) - ); + void collect(String index, Exception e) { + failuresByIndex.putIfAbsent(index, e); } void collectRemoteException(Exception ex, String clusterAlias, String[] remoteIndices) { for (String failedIndex : remoteIndices) { - collect(ex, RemoteClusterAware.buildRemoteIndexName(clusterAlias, failedIndex)); + collect(RemoteClusterAware.buildRemoteIndexName(clusterAlias, failedIndex), ex); } } - int size() { - return this.indexFailures.size(); + boolean isEmpty() { + return failuresByIndex.isEmpty(); } } + + private class NodeTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChannel channel, Task task) throws Exception { + final ActionListener listener = new ChannelActionListener<>(channel, ACTION_NODE_NAME, request); + ActionListener.completeWith(listener, () -> { + final List allResponses = new ArrayList<>(); + final Map allFailures = new HashMap<>(); + final Set allUnmatchedShardIds = new HashSet<>(); + // If the request has an index filter, it may contain several shards belonging to the same + // index. We make sure to skip over a shard if we already found a match for that index. + final Map> groupedShardIds = request.shardIds().stream() + .collect(Collectors.groupingBy(ShardId::getIndexName)); + for (List shardIds : groupedShardIds.values()) { + final Map failures = new HashMap<>(); + final Set unmatched = new HashSet<>(); + for (ShardId shardId : shardIds) { + final FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(request.fields(), shardId, + request.originalIndices(), request.indexFilter(), request.nowInMillis(), request.runtimeFields()); + try { + final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(indexRequest); + if (response.canMatch()) { + unmatched.clear(); + failures.clear(); + allResponses.add(response); + break; + } else { + unmatched.add(shardId); + } + } catch (Exception e) { + failures.put(shardId, e); + } + } + allUnmatchedShardIds.addAll(unmatched); + allFailures.putAll(failures); + } + return new FieldCapabilitiesNodeResponse(allResponses, allFailures, allUnmatchedShardIds); + }); + } + } + + private class ShardTransportHandler implements TransportRequestHandler { + @Override + public void messageReceived(FieldCapabilitiesIndexRequest request, TransportChannel channel, Task task) throws Exception { + ActionListener listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request); + ActionListener.completeWith(listener, () -> fieldCapabilitiesFetcher.fetch(request)); + } + } + } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java deleted file mode 100644 index ae64895db41f7..0000000000000 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0 and the Server Side Public License, v 1; you may not use this file except - * in compliance with, at your election, the Elastic License 2.0 or the Server - * Side Public License, v 1. - */ - -package org.elasticsearch.action.fieldcaps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.NoShardAvailableActionException; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.ObjectMapper; -import org.elasticsearch.index.mapper.RuntimeField; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.index.query.SearchExecutionContext; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.search.SearchService; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.ShardSearchRequest; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportService; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.function.Predicate; - -import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException; - -public class TransportFieldCapabilitiesIndexAction - extends HandledTransportAction { - - private static final Logger logger = LogManager.getLogger(TransportFieldCapabilitiesIndexAction.class); - - private static final String ACTION_NAME = FieldCapabilitiesAction.NAME + "[index]"; - private static final String ACTION_SHARD_NAME = ACTION_NAME + "[s]"; - - private final ClusterService clusterService; - private final TransportService transportService; - private final IndicesService indicesService; - - @Inject - public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, - TransportService transportService, - IndicesService indicesService, - ActionFilters actionFilters) { - super(ACTION_NAME, transportService, actionFilters, FieldCapabilitiesIndexRequest::new); - this.clusterService = clusterService; - this.transportService = transportService; - this.indicesService = indicesService; - transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME, - FieldCapabilitiesIndexRequest::new, new ShardTransportHandler()); - } - - @Override - protected void doExecute(Task task, FieldCapabilitiesIndexRequest request, ActionListener listener) { - new AsyncShardsAction(transportService, clusterService, request, listener).start(); - } - - private FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesIndexRequest request) throws IOException { - final ShardId shardId = request.shardId(); - final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(request.shardId().getId()); - try (Engine.Searcher searcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { - - final SearchExecutionContext searchExecutionContext = indexService.newSearchExecutionContext(shardId.id(), 0, - searcher, request::nowInMillis, null, request.runtimeFields()); - - if (canMatchShard(request, searchExecutionContext) == false) { - return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false); - } - - Set fieldNames = new HashSet<>(); - for (String pattern : request.fields()) { - fieldNames.addAll(searchExecutionContext.getMatchingFieldNames(pattern)); - } - - Predicate fieldPredicate = indicesService.getFieldFilter().apply(shardId.getIndexName()); - Map responseMap = new HashMap<>(); - for (String field : fieldNames) { - MappedFieldType ft = searchExecutionContext.getFieldType(field); - boolean isMetadataField = searchExecutionContext.isMetadataField(field); - if (isMetadataField || fieldPredicate.test(ft.name())) { - IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(field, - ft.familyTypeName(), isMetadataField, ft.isSearchable(), ft.isAggregatable(), ft.meta()); - responseMap.put(field, fieldCap); - } else { - continue; - } - - // Check the ancestor of the field to find nested and object fields. - // Runtime fields are excluded since they can override any path. - //TODO find a way to do this that does not require an instanceof check - if (ft instanceof RuntimeField == false) { - int dotIndex = ft.name().lastIndexOf('.'); - while (dotIndex > -1) { - String parentField = ft.name().substring(0, dotIndex); - if (responseMap.containsKey(parentField)) { - // we added this path on another field already - break; - } - // checks if the parent field contains sub-fields - if (searchExecutionContext.getFieldType(parentField) == null) { - // no field type, it must be an object field - ObjectMapper mapper = searchExecutionContext.getObjectMapper(parentField); - // Composite runtime fields do not have a mapped type for the root - check for null - if (mapper != null) { - String type = mapper.isNested() ? "nested" : "object"; - IndexFieldCapabilities fieldCap = new IndexFieldCapabilities(parentField, type, - false, false, false, Collections.emptyMap()); - responseMap.put(parentField, fieldCap); - } - } - dotIndex = parentField.lastIndexOf('.'); - } - } - } - return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true); - } - } - - private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecutionContext searchExecutionContext) throws IOException { - if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) { - return true; - } - assert req.nowInMillis() != 0L; - ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), null, req.nowInMillis(), AliasFilter.EMPTY); - searchRequest.source(new SearchSourceBuilder().query(req.indexFilter())); - return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext); - } - - /** - * An action that executes on each shard sequentially until it finds one that can match the provided - * {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used - * to create the final {@link FieldCapabilitiesIndexResponse}. - */ - public static class AsyncShardsAction { - private final FieldCapabilitiesIndexRequest request; - private final TransportService transportService; - private final DiscoveryNodes nodes; - private final ActionListener listener; - private final GroupShardsIterator shardsIt; - - private volatile int shardIndex = 0; - - public AsyncShardsAction(TransportService transportService, - ClusterService clusterService, - FieldCapabilitiesIndexRequest request, - ActionListener listener) { - this.listener = listener; - this.transportService = transportService; - - ClusterState clusterState = clusterService.state(); - if (logger.isTraceEnabled()) { - logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version()); - } - nodes = clusterState.nodes(); - this.request = request; - shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), - new String[]{request.index()}, null, null, null, null); - } - - public void start() { - tryNext(null, true); - } - - private void onFailure(ShardRouting shardRouting, Exception e) { - if (e != null) { - logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, request), e); - } - tryNext(e, false); - } - - private ShardRouting nextRoutingOrNull() { - if (shardsIt.size() == 0 || shardIndex >= shardsIt.size()) { - return null; - } - ShardRouting next = shardsIt.get(shardIndex).nextOrNull(); - if (next != null) { - return next; - } - moveToNextShard(); - return nextRoutingOrNull(); - } - - private void moveToNextShard() { - ++ shardIndex; - } - - private void tryNext(@Nullable final Exception lastFailure, boolean canMatchShard) { - ShardRouting shardRouting = nextRoutingOrNull(); - if (shardRouting == null) { - if (canMatchShard == false) { - if (lastFailure == null) { - listener.onResponse(new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false)); - } else { - logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, request), lastFailure); - listener.onFailure(lastFailure); - } - } else { - if (lastFailure == null || isShardNotAvailableException(lastFailure)) { - listener.onFailure(new NoShardAvailableActionException(null, - LoggerMessageFormat.format("No shard available for [{}]", request), lastFailure)); - } else { - logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, request), lastFailure); - listener.onFailure(lastFailure); - } - } - return; - } - DiscoveryNode node = nodes.get(shardRouting.currentNodeId()); - if (node == null) { - onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId())); - } else { - request.shardId(shardRouting.shardId()); - if (logger.isTraceEnabled()) { - logger.trace( - "sending request [{}] on node [{}]", - request, - node - ); - } - transportService.sendRequest(node, ACTION_SHARD_NAME, request, - new TransportResponseHandler() { - - @Override - public FieldCapabilitiesIndexResponse read(StreamInput in) throws IOException { - return new FieldCapabilitiesIndexResponse(in); - } - - @Override - public void handleResponse(final FieldCapabilitiesIndexResponse response) { - if (response.canMatch()) { - listener.onResponse(response); - } else { - moveToNextShard(); - tryNext(null, false); - } - } - - @Override - public void handleException(TransportException exp) { - onFailure(shardRouting, exp); - } - }); - } - } - } - - private class ShardTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(final FieldCapabilitiesIndexRequest request, - final TransportChannel channel, - Task task) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("executing [{}]", request); - } - ActionListener listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request); - final FieldCapabilitiesIndexResponse resp; - try { - resp = shardOperation(request); - } catch (Exception exc) { - listener.onFailure(exc); - return; - } - listener.onResponse(resp); - } - } -} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequestTests.java index 78df47733d68c..3234c2b2d9f98 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesIndexRequestTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -22,7 +24,7 @@ public class FieldCapabilitiesIndexRequestTests extends ESTestCase { public void testSerializingWithRuntimeFieldsBeforeSupportedThrows() { FieldCapabilitiesIndexRequest request = new FieldCapabilitiesIndexRequest( new String[] { "field" }, - "index", + new ShardId(new Index("n/a", "index"), randomIntBetween(0, 100)), new OriginalIndices(new String[] { "original_index" }, IndicesOptions.LENIENT_EXPAND_OPEN), null, 0L, diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java new file mode 100644 index 0000000000000..7a9c282a7ab5f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeRequestTests.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class FieldCapabilitiesNodeRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected FieldCapabilitiesNodeRequest createTestInstance() { + List randomShards = randomShardIds(randomIntBetween(1, 5)); + String[] randomFields = randomFields(randomIntBetween(1, 20)); + OriginalIndices originalIndices = randomOriginalIndices(randomIntBetween(0, 20)); + + QueryBuilder indexFilter = randomBoolean() ? QueryBuilders.termQuery("field", randomAlphaOfLength(5)) : null; + long nowInMillis = randomLong(); + + Map runtimeFields = randomBoolean() + ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) + : null; + + return new FieldCapabilitiesNodeRequest(randomShards, randomFields, originalIndices, indexFilter, nowInMillis, runtimeFields); + } + + private List randomShardIds(int numShards) { + List randomShards = new ArrayList<>(numShards); + for (int i = 0; i < numShards; i++) { + randomShards.add(new ShardId("index", randomAlphaOfLength(10), i)); + } + return randomShards; + } + + private String[] randomFields(int numFields) { + String[] randomFields = new String[numFields]; + for (int i = 0; i < numFields; i++) { + randomFields[i] = randomAlphaOfLengthBetween(5, 10); + } + return randomFields; + } + + private OriginalIndices randomOriginalIndices(int numIndices) { + String[] randomIndices = new String[numIndices]; + for (int i = 0; i < numIndices; i++) { + randomIndices[i] = randomAlphaOfLengthBetween(5, 10); + } + IndicesOptions indicesOptions = randomBoolean() ? IndicesOptions.strictExpand() : IndicesOptions.lenientExpandOpen(); + return new OriginalIndices(randomIndices, indicesOptions); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); + return new NamedWriteableRegistry(searchModule.getNamedWriteables()); + } + + @Override + protected Writeable.Reader instanceReader() { + return FieldCapabilitiesNodeRequest::new; + } + + @Override + protected FieldCapabilitiesNodeRequest mutateInstance(FieldCapabilitiesNodeRequest instance) throws IOException { + switch (random().nextInt(5)) { + case 0: + List shardIds = randomShardIds(instance.shardIds().size() + 1); + return new FieldCapabilitiesNodeRequest(shardIds, instance.fields(), instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 1: + String[] fields = randomFields(instance.fields().length + 2); + return new FieldCapabilitiesNodeRequest(instance.shardIds(), fields, instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 2: + OriginalIndices originalIndices = randomOriginalIndices(instance.indices().length + 1); + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), originalIndices, + instance.indexFilter(), instance.nowInMillis(), instance.runtimeFields()); + case 3: + QueryBuilder indexFilter = instance.indexFilter() == null ? QueryBuilders.matchAllQuery() : null; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + indexFilter, instance.nowInMillis(), instance.runtimeFields()); + case 4: + long nowInMillis = instance.nowInMillis() + 100; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + instance.indexFilter(), nowInMillis, instance.runtimeFields()); + case 5: + Map runtimeFields = instance.runtimeFields() == null + ? Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)) + : null; + return new FieldCapabilitiesNodeRequest(instance.shardIds(), instance.fields(), instance.originalIndices(), + instance.indexFilter(), instance.nowInMillis(), runtimeFields); + default: + throw new IllegalStateException("The test should only allow 5 parameters mutated"); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java new file mode 100644 index 0000000000000..8e76d59dd9cf2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesNodeResponseTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class FieldCapabilitiesNodeResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected FieldCapabilitiesNodeResponse createTestInstance() { + List responses = new ArrayList<>(); + int numResponse = randomIntBetween(0, 10); + for (int i = 0; i < numResponse; i++) { + responses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse()); + } + int numUnmatched = randomIntBetween(0, 3); + Set shardIds = new HashSet<>(); + for (int i = 0; i < numUnmatched; i++) { + shardIds.add(new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), between(0, 10))); + } + return new FieldCapabilitiesNodeResponse(responses, Collections.emptyMap(), shardIds); + } + + @Override + protected Writeable.Reader instanceReader() { + return FieldCapabilitiesNodeResponse::new; + } + + @Override + protected FieldCapabilitiesNodeResponse mutateInstance(FieldCapabilitiesNodeResponse response) { + List newResponses = new ArrayList<>(response.getIndexResponses()); + int mutation = response.getIndexResponses().isEmpty() ? 0 : randomIntBetween(0, 2); + switch (mutation) { + case 0: + newResponses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse()); + break; + case 1: + int toRemove = randomInt(newResponses.size() - 1); + newResponses.remove(toRemove); + break; + case 2: + int toReplace = randomInt(newResponses.size() - 1); + newResponses.set(toReplace, FieldCapabilitiesResponseTests.createRandomIndexResponse()); + break; + } + return new FieldCapabilitiesNodeResponse(newResponses, Collections.emptyMap(), response.getUnmatchedShardIds()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java index 500bac92e50cf..e86447959cc0b 100644 --- a/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/FieldCapabilitiesResponseTests.java @@ -51,7 +51,11 @@ protected Writeable.Reader instanceReader() { return FieldCapabilitiesResponse::new; } - private FieldCapabilitiesIndexResponse createRandomIndexResponse() { + public static FieldCapabilitiesIndexResponse createRandomIndexResponse() { + return randomIndexResponse(randomAsciiLettersOfLength(10), randomBoolean()); + } + + public static FieldCapabilitiesIndexResponse randomIndexResponse(String index, boolean canMatch) { Map responses = new HashMap<>(); String[] fields = generateRandomStringArray(5, 10, false, true); @@ -60,10 +64,10 @@ private FieldCapabilitiesIndexResponse createRandomIndexResponse() { for (String field : fields) { responses.put(field, randomFieldCaps(field)); } - return new FieldCapabilitiesIndexResponse(randomAsciiLettersOfLength(10), responses, randomBoolean()); + return new FieldCapabilitiesIndexResponse(index, responses, canMatch); } - private static IndexFieldCapabilities randomFieldCaps(String fieldName) { + public static IndexFieldCapabilities randomFieldCaps(String fieldName) { Map meta; switch (randomInt(2)) { case 0: diff --git a/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java new file mode 100644 index 0000000000000..0e6147deb6dbf --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/fieldcaps/RequestDispatcherTests.java @@ -0,0 +1,1057 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.fieldcaps; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import com.carrotsearch.hppc.cursors.IntCursor; +import com.carrotsearch.hppc.cursors.ObjectIntCursor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.OriginalIndices; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.gateway.TestGatewayAllocator; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportInterceptor; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportResponseHandler; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.nio.MockNioTransport; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponseTests.randomIndexResponse; +import static org.elasticsearch.action.fieldcaps.RequestDispatcher.GROUP_REQUESTS_VERSION; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.in; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Besides the assertions in each test, the variants of {@link RequestDispatcher} are verified in + * {@link RequestTracker#verifyAfterComplete()} after each test. + */ +public class RequestDispatcherTests extends ESAllocationTestCase { + static final Logger logger = LogManager.getLogger(RequestDispatcherTests.class); + + public void testHappyCluster() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).collect(Collectors.toList()); + final ClusterState clusterState; + final boolean newVersionOnly = randomBoolean(); + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(1, 10); + for (int i = 0; i < numNodes; i++) { + final Version nodeVersion; + if (newVersionOnly || randomBoolean()) { + nodeVersion = randomNewVersion(); + } else { + nodeVersion = randomOldVersion(); + } + discoNodes.add(newNode("node_" + i, nodeVersion)); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadata.put(IndexMetadata.builder(index).settings(settings)); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + logger.debug("--> test with indices {}", indices); + final boolean withFilter = randomBoolean(); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + indices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), equalTo(Sets.newHashSet(indices))); + assertThat(responseCollector.failures, anEmptyMap()); + assertThat("Happy case should complete after one round", dispatcher.executionRound(), equalTo(1)); + for (NodeRequest nodeRequest : requestTracker.sentNodeRequests) { + assertThat("All requests occur in round 0",nodeRequest.round, equalTo(0)); + } + for (String index : indices) { + final List nodeRequests = requestTracker.nodeRequests(index); + final List shardRequests = requestTracker.shardRequests(index); + if (withFilter) { + Set requestedShardIds = new HashSet<>(); + for (NodeRequest nodeRequest : nodeRequests) { + for (ShardId shardId : nodeRequest.requestedShardIds(index)) { + assertTrue(requestedShardIds.add(shardId)); + } + } + for (ShardRequest shardRequest : shardRequests) { + assertTrue(requestedShardIds.add(shardRequest.request.shardId())); + } + final Set assignedShardIds = clusterState.routingTable().index(index).randomAllActiveShardsIt() + .getShardRoutings().stream() + .map(ShardRouting::shardId).collect(Collectors.toSet()); + assertThat(requestedShardIds, equalTo(assignedShardIds)); + } else { + assertThat("index " + index + " wasn't requested one time", nodeRequests.size() + shardRequests.size(), equalTo(1)); + } + } + } + } + + public void testRetryThenOk() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).collect(Collectors.toList()); + final ClusterState clusterState; + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(2, 10); + for (int i = 0; i < numNodes; i++) { + discoNodes.add(newNode("node_" + i, randomBoolean() ? randomNewVersion() : randomOldVersion())); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(1, 3)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadata.put(IndexMetadata.builder(index).settings(settings)); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + logger.debug("--> test with indices {}", indices); + final boolean withFilter = randomBoolean(); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + indices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + + final Map maxFailedRounds = new HashMap<>(); + for (String index : randomSubsetOf(between(1, indices.size()), indices)) { + maxFailedRounds.put(index, randomIntBetween(1, maxPossibleRounds(clusterState, index, withFilter) - 1)); + } + + final AtomicInteger failedTimes = new AtomicInteger(); + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + final int currentRound = dispatcher.executionRound(); + if (request instanceof FieldCapabilitiesNodeRequest) { + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set requestedIndices = nodeRequest.shardIds().stream() + .map(ShardId::getIndexName) + .collect(Collectors.toSet()); + if (currentRound > 0) { + assertThat("Only failed indices are retried after the first found", + requestedIndices, everyItem(in(maxFailedRounds.keySet()))); + } + Set successIndices = new HashSet<>(); + List failedShards = new ArrayList<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + final Integer maxRound = maxFailedRounds.get(shardId.getIndexName()); + if (maxRound == null || currentRound >= maxRound) { + successIndices.add(shardId.getIndexName()); + } else { + failedShards.add(shardId); + failedTimes.incrementAndGet(); + } + } + transportService.sendResponse(handler, + randomNodeResponse(successIndices, failedShards, Collections.emptySet())); + } else { + FieldCapabilitiesIndexRequest indexRequest = (FieldCapabilitiesIndexRequest) request; + final String index = indexRequest.index(); + if (currentRound > 0) { + assertThat("Only failed index is executed after the first found", index, in(maxFailedRounds.keySet())); + } + final Integer maxRound = maxFailedRounds.get(index); + if (maxRound == null || currentRound >= maxRound) { + transportService.sendResponse(handler, randomIndexResponse(index, true)); + } else { + failedTimes.incrementAndGet(); + transportService.sendFailure(handler, new IllegalStateException("shard was closed")); + } + } + } + }); + + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), equalTo(Sets.newHashSet(indices))); + assertThat(responseCollector.failures, anEmptyMap()); + int maxRound = maxFailedRounds.values().stream().mapToInt(n -> n).max().getAsInt(); + assertThat(dispatcher.executionRound(), equalTo(maxRound + 1)); + for (String index : indices) { + if (withFilter) { + ObjectIntMap copies = new ObjectIntHashMap<>(); + for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { + copies.addTo(shardRouting.shardId(), 1); + } + final int executedRounds = maxFailedRounds.getOrDefault(index, 0); + for (int round = 0; round <= executedRounds; round++) { + final Set requestedShards = new HashSet<>(); + for (NodeRequest nodeRequest : requestTracker.nodeRequests(index, round)) { + for (ShardId shardId : nodeRequest.requestedShardIds(index)) { + assertTrue(requestedShards.add(shardId)); + } + } + for (ShardRequest shardRequest : requestTracker.shardRequests(index, round)) { + assertTrue(requestedShards.add(shardRequest.request.shardId())); + } + final Set availableShards = new HashSet<>(); + for (ObjectIntCursor e : copies) { + if (e.value > 0) { + availableShards.add(e.key); + copies.addTo(e.key, -1); + } + } + assertThat("round: " + round, requestedShards, equalTo(availableShards)); + } + } else { + final Integer failedRounds = maxFailedRounds.get(index); + final int sentRequests = requestTracker.shardRequests(index).size() + requestTracker.nodeRequests(index).size(); + if (failedRounds != null) { + assertThat(sentRequests, equalTo(failedRounds + 1)); + } else { + assertThat(sentRequests, equalTo(1)); + } + } + } + } + } + + public void testRetryButFails() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).collect(Collectors.toList()); + final ClusterState clusterState; + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(1, 10); + for (int i = 0; i < numNodes; i++) { + discoNodes.add(newNode("node_" + i, randomBoolean() ? randomNewVersion() : randomOldVersion())); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 3)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadata.put(IndexMetadata.builder(index).settings(settings)); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + logger.debug("--> test with indices {}", indices); + final boolean withFilter = randomBoolean(); + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + indices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + + List failedIndices = randomSubsetOf(between(1, indices.size()), indices); + + final AtomicInteger failedTimes = new AtomicInteger(); + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + final int currentRound = dispatcher.executionRound(); + if (request instanceof FieldCapabilitiesNodeRequest) { + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + if (currentRound > 0) { + for (ShardId shardId : nodeRequest.shardIds()) { + assertThat("Only failed indices are retried after the first found", + shardId.getIndexName(), in(failedIndices)); + } + } + Set toRespondIndices = new HashSet<>(); + Set toFailShards = new HashSet<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + if (failedIndices.contains(shardId.getIndexName())) { + toFailShards.add(shardId); + failedTimes.incrementAndGet(); + } else { + toRespondIndices.add(shardId.getIndexName()); + } + } + transportService.sendResponse(handler, randomNodeResponse(toRespondIndices, toFailShards, Collections.emptySet())); + } else { + FieldCapabilitiesIndexRequest indexRequest = (FieldCapabilitiesIndexRequest) request; + final String index = indexRequest.index(); + if (currentRound > 0) { + assertThat("Only failed index is executed after the first found", index, in(failedIndices)); + } + if (failedIndices.contains(index)) { + failedTimes.incrementAndGet(); + transportService.sendFailure(handler, new IllegalStateException("shard was closed")); + } else { + transportService.sendResponse(handler, randomIndexResponse(index, true)); + } + } + } + }); + + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(failedTimes.get(), greaterThan(0)); + assertThat(responseCollector.responses.keySet(), + equalTo(indices.stream().filter(i -> failedIndices.contains(i) == false).collect(Collectors.toSet()))); + assertThat(responseCollector.failures.keySet(), equalTo(Sets.newHashSet(failedIndices))); + + int maxRound = failedIndices.stream().mapToInt(index -> maxPossibleRounds(clusterState, index, withFilter)).max().getAsInt(); + assertThat(dispatcher.executionRound(), equalTo(maxRound)); + for (String index : indices) { + if (withFilter) { + ObjectIntMap copies = new ObjectIntHashMap<>(); + for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { + copies.addTo(shardRouting.shardId(), 1); + } + final int executedRounds = failedIndices.contains(index) ? maxPossibleRounds(clusterState, index, true) : 0; + for (int round = 0; round <= executedRounds; round++) { + final Set requestedShards = new HashSet<>(); + for (NodeRequest nodeRequest : requestTracker.nodeRequests(index, round)) { + for (ShardId shardId : nodeRequest.requestedShardIds(index)) { + assertTrue(requestedShards.add(shardId)); + } + } + for (ShardRequest shardRequest : requestTracker.shardRequests(index, round)) { + assertTrue(requestedShards.add(shardRequest.request.shardId())); + } + final Set availableShards = new HashSet<>(); + for (ObjectIntCursor e : copies) { + if (e.value > 0) { + availableShards.add(e.key); + copies.addTo(e.key, -1); + } + } + assertThat("round: " + round, requestedShards, equalTo(availableShards)); + } + if (failedIndices.contains(index)) { + for (ObjectIntCursor cursor : copies) { + assertThat("All copies of shard " + cursor.key + " must be tried", cursor.value, equalTo(0)); + } + } + } else { + final int sentRequests = requestTracker.shardRequests(index).size() + requestTracker.nodeRequests(index).size(); + if (failedIndices.contains(index)) { + assertThat(sentRequests, equalTo(maxPossibleRounds(clusterState, index, false))); + } else { + assertThat(sentRequests, equalTo(1)); + } + } + } + } + } + + public void testSuccessWithAnyMatch() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).collect(Collectors.toList()); + final ClusterState clusterState; + final boolean newVersionOnly = randomBoolean(); + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(1, 10); + for (int i = 0; i < numNodes; i++) { + final Version nodeVersion; + if (newVersionOnly || randomBoolean()) { + nodeVersion = randomNewVersion(); + } else { + nodeVersion = randomOldVersion(); + } + discoNodes.add(newNode("node_" + i, nodeVersion)); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(2, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadata.put(IndexMetadata.builder(index).settings(settings)); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + logger.debug("--> test with indices {}", indices); + final boolean withFilter = true; + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + indices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + final AtomicInteger failedTimes = new AtomicInteger(); + final Set allUnmatchedShardIds = new HashSet<>(); + for (String index : indices) { + final Set shardIds = new HashSet<>(); + for (ShardRouting shardRouting : clusterState.routingTable().index(index).randomAllActiveShardsIt()) { + shardIds.add(shardRouting.shardId()); + } + assertThat("suspect index requires at lease two shards", shardIds.size(), greaterThan(1)); + allUnmatchedShardIds.addAll(randomSubsetOf(between(1, shardIds.size() - 1), shardIds)); + } + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + if (request instanceof FieldCapabilitiesNodeRequest) { + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set toRespondIndices = new HashSet<>(); + Set unmatchedShardIds = new HashSet<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + if (allUnmatchedShardIds.contains(shardId)) { + assertTrue(unmatchedShardIds.add(shardId)); + } else { + toRespondIndices.add(shardId.getIndexName()); + } + } + transportService.sendResponse(handler, + randomNodeResponse(toRespondIndices, Collections.emptyList(), unmatchedShardIds)); + } else { + FieldCapabilitiesIndexRequest indexRequest = (FieldCapabilitiesIndexRequest) request; + if (allUnmatchedShardIds.contains(indexRequest.shardId())) { + failedTimes.incrementAndGet(); + transportService.sendResponse(handler, randomIndexResponse(indexRequest.index(), false)); + } else { + transportService.sendResponse(handler, randomIndexResponse(indexRequest.index(), true)); + } + } + } + }); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), equalTo(Sets.newHashSet(indices))); + assertThat(responseCollector.failures, anEmptyMap()); + assertThat(dispatcher.executionRound(), equalTo(1)); + for (String index : indices) { + final List nodeRequests = requestTracker.nodeRequests(index); + final List shardRequests = requestTracker.shardRequests(index); + Set requestedShardIds = new HashSet<>(); + for (NodeRequest nodeRequest : nodeRequests) { + for (ShardId shardId : nodeRequest.requestedShardIds(index)) { + assertTrue(requestedShardIds.add(shardId)); + } + } + for (ShardRequest shardRequest : shardRequests) { + assertTrue(requestedShardIds.add(shardRequest.request.shardId())); + } + final Set assignedShardIds = clusterState.routingTable().index(index).randomAllActiveShardsIt() + .getShardRoutings().stream() + .map(ShardRouting::shardId).collect(Collectors.toSet()); + assertThat(requestedShardIds, equalTo(assignedShardIds)); + } + } + } + + public void testStopAfterAllShardsUnmatched() throws Exception { + final List allIndices = IntStream.rangeClosed(1, 5).mapToObj(n -> "index_" + n).collect(Collectors.toList()); + final ClusterState clusterState; + final boolean newVersionOnly = randomBoolean(); + { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + int numNodes = randomIntBetween(1, 10); + for (int i = 0; i < numNodes; i++) { + final Version nodeVersion; + if (newVersionOnly || randomBoolean()) { + nodeVersion = randomNewVersion(); + } else { + nodeVersion = randomOldVersion(); + } + discoNodes.add(newNode("node_" + i, nodeVersion)); + } + Metadata.Builder metadata = Metadata.builder(); + for (String index : allIndices) { + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 10)) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 2)) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion()); + metadata.put(IndexMetadata.builder(index).settings(settings)); + } + clusterState = newClusterState(metadata.build(), discoNodes.build()); + } + try (TestTransportService transportService = TestTransportService.newTestTransportService()) { + final List indices = randomSubsetOf(between(1, allIndices.size()), allIndices); + logger.debug("--> test with indices {}", indices); + final boolean withFilter = true; + final ResponseCollector responseCollector = new ResponseCollector(); + final RequestDispatcher dispatcher = new RequestDispatcher( + mockClusterService(clusterState), + transportService, + newRandomParentTask(), + randomFieldCapRequest(withFilter), + OriginalIndices.NONE, + randomNonNegativeLong(), + indices.toArray(new String[0]), + transportService.threadPool.executor(ThreadPool.Names.MANAGEMENT), + responseCollector::addIndexResponse, + responseCollector::addIndexFailure, + responseCollector::onComplete + ); + final RequestTracker requestTracker = new RequestTracker(dispatcher, clusterState.routingTable(), withFilter); + transportService.requestTracker.set(requestTracker); + final AtomicInteger failedTimes = new AtomicInteger(); + final List unmatchedIndices = randomSubsetOf(between(1, indices.size()), indices); + transportService.setTransportInterceptor(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + if (request instanceof FieldCapabilitiesNodeRequest) { + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set toRespondIndices = new HashSet<>(); + Set unmatchedShardIds = new HashSet<>(); + for (ShardId shardId : nodeRequest.shardIds()) { + if (unmatchedIndices.contains(shardId.getIndexName())) { + assertTrue(unmatchedShardIds.add(shardId)); + } else { + toRespondIndices.add(shardId.getIndexName()); + } + } + transportService.sendResponse(handler, + randomNodeResponse(toRespondIndices, Collections.emptyList(), unmatchedShardIds)); + } else { + FieldCapabilitiesIndexRequest indexRequest = (FieldCapabilitiesIndexRequest) request; + if (unmatchedIndices.contains(indexRequest.index())) { + failedTimes.incrementAndGet(); + transportService.sendResponse(handler, randomIndexResponse(indexRequest.index(), false)); + } else { + transportService.sendResponse(handler, randomIndexResponse(indexRequest.index(), true)); + } + } + } + }); + dispatcher.execute(); + responseCollector.awaitCompletion(); + assertThat(responseCollector.responses.keySet(), + equalTo(indices.stream().filter(index -> unmatchedIndices.contains(index) == false).collect(Collectors.toSet()))); + assertThat(responseCollector.failures, anEmptyMap()); + assertThat(dispatcher.executionRound(), equalTo(1)); + for (String index : indices) { + final List nodeRequests = requestTracker.nodeRequests(index); + final List shardRequests = requestTracker.shardRequests(index); + Set requestedShardIds = new HashSet<>(); + for (NodeRequest nodeRequest : nodeRequests) { + for (ShardId shardId : nodeRequest.requestedShardIds(index)) { + assertTrue(requestedShardIds.add(shardId)); + } + } + for (ShardRequest shardRequest : shardRequests) { + assertTrue(requestedShardIds.add(shardRequest.request.shardId())); + } + final Set assignedShardIds = clusterState.routingTable().index(index).randomAllActiveShardsIt() + .getShardRoutings().stream() + .map(ShardRouting::shardId).collect(Collectors.toSet()); + assertThat(requestedShardIds, equalTo(assignedShardIds)); + } + } + } + + private static class NodeRequest { + final int round; + final DiscoveryNode node; + final FieldCapabilitiesNodeRequest request; + + NodeRequest(int round, DiscoveryNode node, FieldCapabilitiesNodeRequest request) { + this.round = round; + this.node = node; + this.request = request; + } + + Set indices() { + return request.shardIds().stream().map(ShardId::getIndexName).collect(Collectors.toSet()); + } + + Set requestedShardIds(String index) { + return request.shardIds().stream().filter(s -> s.getIndexName().equals(index)).collect(Collectors.toSet()); + } + } + + private static class ShardRequest { + final int round; + final DiscoveryNode node; + final FieldCapabilitiesIndexRequest request; + + ShardRequest(int round, DiscoveryNode node, FieldCapabilitiesIndexRequest request) { + this.round = round; + this.node = node; + this.request = request; + } + } + + private static class RequestTracker { + private final RequestDispatcher dispatcher; + private final RoutingTable routingTable; + private final boolean withFilter; + private final AtomicInteger currentRound = new AtomicInteger(); + + final List sentNodeRequests = new CopyOnWriteArrayList<>(); + final List sentShardRequests = new CopyOnWriteArrayList<>(); + + RequestTracker(RequestDispatcher dispatcher, RoutingTable routingTable, boolean withFilter) { + this.dispatcher = dispatcher; + this.routingTable = routingTable; + this.withFilter = withFilter; + } + + void verifyAfterComplete() { + final int lastRound = dispatcher.executionRound(); + // No requests are sent in the last round + for (NodeRequest request : sentNodeRequests) { + assertThat(request.round, lessThan(lastRound)); + } + for (ShardRequest request : sentShardRequests) { + assertThat(request.round, lessThan(lastRound)); + } + for (int i = 0; i < lastRound; i++) { + int round = i; + List shardRequests = sentShardRequests.stream().filter(r -> r.round == round).collect(Collectors.toList()); + List nodeRequests = sentNodeRequests.stream().filter(r -> r.round == round).collect(Collectors.toList()); + if (withFilter == false) { + // Without filter, each index is requested once in each round. + ObjectIntMap requestsPerIndex = new ObjectIntHashMap<>(); + shardRequests.forEach(r -> requestsPerIndex.addTo(r.request.index(), 1)); + nodeRequests.forEach(r -> r.indices().forEach(index -> requestsPerIndex.addTo(index, 1))); + for (ObjectIntCursor e : requestsPerIndex) { + assertThat("index " + e.key + " has requested more than once", e.value, equalTo(1)); + } + } + // With or without filter, each new node receives at most one request each round + final Map> requestsPerNode = sentNodeRequests.stream() + .filter(r -> r.round == round) + .collect(Collectors.groupingBy(r -> r.node)); + for (Map.Entry> e : requestsPerNode.entrySet()) { + assertThat("node " + e.getKey().getName() + " receives more than 1 requests in round " + currentRound, + e.getValue(), hasSize(1)); + } + // No shardId is requested more than once in a round + Set requestedShards = new HashSet<>(); + for (ShardRequest shardRequest : shardRequests) { + assertTrue(requestedShards.add(shardRequest.request.shardId())); + } + for (NodeRequest nodeRequest : nodeRequests) { + for (ShardId shardId : nodeRequest.request.shardIds()) { + assertTrue(requestedShards.add(shardId)); + } + } + } + + // Request only shards that assigned to target nodes + for (NodeRequest nodeRequest : sentNodeRequests) { + for (String index : nodeRequest.indices()) { + final Set requestedShardIds = nodeRequest.requestedShardIds(index); + final Set assignedShardIds = assignedShardsOnNode(routingTable.index(index), nodeRequest.node.getId()); + assertThat(requestedShardIds, everyItem(in(assignedShardIds))); + } + } + for (ShardRequest shardRequest : sentShardRequests) { + final String index = shardRequest.request.index(); + final Set assignedShardIds = assignedShardsOnNode(routingTable.index(index), shardRequest.node.getId()); + assertThat(shardRequest.request.shardId(), in(assignedShardIds)); + } + + // No shard is requested twice + Map> requestedShardIdsPerNode = new HashMap<>(); + for (NodeRequest nodeRequest : sentNodeRequests) { + final Set shardIds = requestedShardIdsPerNode.computeIfAbsent(nodeRequest.node.getId(), k -> new HashSet<>()); + for (ShardId shardId : nodeRequest.request.shardIds()) { + assertTrue(shardIds.add(shardId)); + } + } + for (ShardRequest shardRequest : sentShardRequests) { + final Set shardIds = requestedShardIdsPerNode.computeIfAbsent(shardRequest.node.getId(), k -> new HashSet<>()); + assertTrue(shardIds.add(shardRequest.request.shardId())); + } + } + + void verifyAndTrackRequest(Transport.Connection connection, String action, TransportRequest request) { + final int requestRound = dispatcher.executionRound(); + final DiscoveryNode node = connection.getNode(); + if (action.equals(TransportFieldCapabilitiesAction.ACTION_NODE_NAME)) { + assertTrue(node.getVersion().toString(), node.getVersion().onOrAfter(GROUP_REQUESTS_VERSION)); + assertThat(request, instanceOf(FieldCapabilitiesNodeRequest.class)); + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + sentNodeRequests.add(new NodeRequest(requestRound, node, nodeRequest)); + } else { + assertThat(action, equalTo(TransportFieldCapabilitiesAction.ACTION_SHARD_NAME)); + assertTrue(node.getVersion().toString(), node.getVersion().before(GROUP_REQUESTS_VERSION)); + assertThat(request, instanceOf(FieldCapabilitiesIndexRequest.class)); + FieldCapabilitiesIndexRequest shardRequest = (FieldCapabilitiesIndexRequest) request; + sentShardRequests.add(new ShardRequest(requestRound, node, shardRequest)); + } + } + + List shardRequests(String index) { + return sentShardRequests.stream().filter(r -> r.request.index().equals(index)).collect(Collectors.toList()); + } + + List shardRequests(String index, int round) { + return sentShardRequests.stream().filter(r -> r.round == round && r.request.index().equals(index)) + .collect(Collectors.toList()); + } + + List nodeRequests(String index, int round) { + return sentNodeRequests.stream().filter(r -> r.round == round && r.indices().contains(index)) + .collect(Collectors.toList()); + } + + List nodeRequests(String index) { + return sentNodeRequests.stream().filter(r -> r.indices().contains(index)).collect(Collectors.toList()); + } + } + + private static class TestTransportService extends TransportService { + final SetOnce requestTracker = new SetOnce<>(); + + final ThreadPool threadPool; + private TransportInterceptor.AsyncSender interceptor = null; + + private TestTransportService(Transport transport, TransportInterceptor.AsyncSender asyncSender, ThreadPool threadPool) { + super(Settings.EMPTY, transport, threadPool, new TransportInterceptor() { + @Override + public AsyncSender interceptSender(AsyncSender sender) { + return asyncSender; + } + }, addr -> newNode("local"), null, Collections.emptySet()); + this.threadPool = threadPool; + } + + @Override + public Transport.Connection getConnection(DiscoveryNode node) { + final Transport.Connection conn = mock(Transport.Connection.class); + when(conn.getNode()).thenReturn(node); + return conn; + } + + static TestTransportService newTestTransportService() { + final TestThreadPool threadPool = new TestThreadPool("test"); + MockNioTransport mockTransport = new MockNioTransport(Settings.EMPTY, Version.CURRENT, threadPool, + new NetworkService(Collections.emptyList()), PageCacheRecycler.NON_RECYCLING_INSTANCE, + new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService()); + SetOnce asyncSenderHolder = new SetOnce<>(); + TestTransportService transportService = new TestTransportService(mockTransport, new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + final TransportInterceptor.AsyncSender asyncSender = asyncSenderHolder.get(); + assertNotNull(asyncSender); + asyncSender.sendRequest(connection, action, request, options, handler); + } + }, threadPool); + asyncSenderHolder.set(new TransportInterceptor.AsyncSender() { + @Override + public void sendRequest(Transport.Connection connection, String action, + TransportRequest request, TransportRequestOptions options, + TransportResponseHandler handler) { + final RequestTracker requestTracker = transportService.requestTracker.get(); + assertNotNull("Request tracker wasn't set", requestTracker); + requestTracker.verifyAndTrackRequest(connection, action, request); + + if (transportService.interceptor != null) { + transportService.interceptor.sendRequest(connection, action, request, options, handler); + } else { + if (request instanceof FieldCapabilitiesNodeRequest) { + FieldCapabilitiesNodeRequest nodeRequest = (FieldCapabilitiesNodeRequest) request; + Set indices = nodeRequest.shardIds().stream().map(ShardId::getIndexName).collect(Collectors.toSet()); + transportService.sendResponse(handler, + randomNodeResponse(indices, Collections.emptyList(), Collections.emptySet())); + } else { + FieldCapabilitiesIndexRequest indexRequest = (FieldCapabilitiesIndexRequest) request; + transportService.sendResponse(handler, randomIndexResponse(indexRequest.index(), true)); + } + } + } + }); + transportService.start(); + return transportService; + } + + void setTransportInterceptor(TransportInterceptor.AsyncSender interceptor) { + this.interceptor = interceptor; + } + + @Override + protected void doClose() throws IOException { + super.doClose(); + threadPool.shutdown(); + requestTracker.get().verifyAfterComplete(); + } + + @SuppressWarnings("unchecked") + void sendResponse(TransportResponseHandler handler, TransportResponse resp) { + threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() { + handler.handleResponse((T) resp); + } + }); + } + + void sendFailure(TransportResponseHandler handler, Exception e) { + threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + + @Override + protected void doRun() { + handler.handleException(new TransportException(e)); + } + }); + } + } + + static FieldCapabilitiesRequest randomFieldCapRequest(boolean withFilter) { + final QueryBuilder filter = withFilter ? new RangeQueryBuilder("timestamp").from(randomNonNegativeLong()) : null; + return new FieldCapabilitiesRequest().fields("*").indexFilter(filter); + } + + static FieldCapabilitiesNodeResponse randomNodeResponse(Collection successIndices, + Collection failedShards, Set unmatchedShards) { + final Map failures = new HashMap<>(); + for (ShardId shardId : failedShards) { + failures.put(shardId, new IllegalStateException(randomAlphaOfLength(10))); + } + final List indexResponses = successIndices.stream() + .map(index -> randomIndexResponse(index, true)).collect(Collectors.toList()); + return new FieldCapabilitiesNodeResponse(indexResponses, failures, unmatchedShards); + } + + static class ResponseCollector { + final Map responses = ConcurrentCollections.newConcurrentMap(); + final Map failures = ConcurrentCollections.newConcurrentMap(); + final CountDownLatch latch = new CountDownLatch(1); + + void addIndexResponse(FieldCapabilitiesIndexResponse resp) { + assertTrue("Only matched responses are updated", resp.canMatch()); + final String index = resp.getIndexName(); + final FieldCapabilitiesIndexResponse existing = responses.put(index, resp); + assertNull("index [" + index + "] was responded already", existing); + assertThat("index [" + index + "]was failed already", index, not(in(failures.keySet()))); + } + + void addIndexFailure(String index, Exception e) { + final Exception existing = failures.put(index, e); + assertNull("index [" + index + "] was failed already", existing); + assertThat("index [" + index + "]was responded already", index, not(in(responses.keySet()))); + } + + void onComplete() { + latch.countDown(); + } + + void awaitCompletion() throws Exception { + assertTrue(latch.await(1, TimeUnit.MINUTES)); + } + } + + static Set assignedShardsOnNode(IndexRoutingTable routingTable, String nodeId) { + final Set shardIds = new HashSet<>(); + for (ShardRouting shardRouting : routingTable.randomAllActiveShardsIt()) { + if (shardRouting.currentNodeId().equals(nodeId)) { + shardIds.add(shardRouting.shardId()); + } + } + return shardIds; + } + + static Task newRandomParentTask() { + return new Task(0, "type", "action", randomAlphaOfLength(10), TaskId.EMPTY_TASK_ID, Collections.emptyMap()); + } + + private ClusterState newClusterState(Metadata metadata, DiscoveryNodes discoveryNodes) { + final RoutingTable.Builder routingTable = RoutingTable.builder(); + for (IndexMetadata imd : metadata) { + routingTable.addAsNew(metadata.index(imd.getIndex())); + } + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(discoveryNodes) + .metadata(metadata) + .routingTable(routingTable.build()) + .build(); + final Settings settings = Settings.EMPTY; + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final ArrayList deciders = new ArrayList<>(); + deciders.add(new EnableAllocationDecider(settings, clusterSettings)); + deciders.add(new SameShardAllocationDecider(settings, clusterSettings)); + deciders.add(new ReplicaAfterPrimaryActiveAllocationDecider()); + Collections.shuffle(deciders, random()); + final MockAllocationService allocationService = new MockAllocationService(new AllocationDeciders(deciders), + new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES); + return applyStartedShardsUntilNoChange(clusterState, allocationService); + } + + /** + * Returns the maximum number of rounds that a given index can be executed in case of failures. + */ + static int maxPossibleRounds(ClusterState clusterState, String index, boolean withFilter) { + final IndexRoutingTable routingTable = clusterState.routingTable().index(index); + if (withFilter) { + ObjectIntMap numCopiesPerShard = new ObjectIntHashMap<>(); + for (ShardRouting shard : routingTable.randomAllActiveShardsIt()) { + numCopiesPerShard.addTo(shard.shardId(), 1); + } + int maxRound = 0; + for (ObjectIntCursor numCopies : numCopiesPerShard) { + maxRound = Math.max(maxRound, numCopies.value); + } + return maxRound; + } else { + ObjectIntMap requestsPerNode = new ObjectIntHashMap<>(); + for (ShardRouting shard : routingTable.randomAllActiveShardsIt()) { + final String nodeId = shard.currentNodeId(); + if (clusterState.nodes().get(nodeId).getVersion().onOrAfter(GROUP_REQUESTS_VERSION)) { + requestsPerNode.put(nodeId, 1); + } else { + requestsPerNode.addTo(nodeId, 1); + } + } + int totalRequests = 0; + for (IntCursor cursor : requestsPerNode.values()) { + totalRequests += cursor.value; + } + return totalRequests; + } + } + + static Version randomNewVersion() { + return VersionUtils.randomVersionBetween(random(), GROUP_REQUESTS_VERSION, Version.CURRENT); + } + + static Version randomOldVersion() { + final Version previousVersion = VersionUtils.getPreviousVersion(GROUP_REQUESTS_VERSION); + return VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion); + } + + static ClusterService mockClusterService(ClusterState clusterState) { + final ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, clusterSettings); + when(clusterService.operationRouting()).thenReturn(operationRouting); + return clusterService; + } +}