Skip to content

Commit

Permalink
Add retry for node level field caps requests (#78647)
Browse files Browse the repository at this point in the history
This adds a retry mechanism for node level field caps requests 
introduced in #77047.
  • Loading branch information
dnhatn authored Oct 14, 2021
1 parent 058425a commit 6f31965
Show file tree
Hide file tree
Showing 14 changed files with 1,708 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,34 @@

import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.query.AbstractQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -39,16 +57,28 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.util.Collections.singletonList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;

public class FieldCapabilitiesIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> getMockPlugins() {
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins());
plugins.add(MockTransportService.TestPlugin.class);
return plugins;
}

@Override
@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -313,6 +343,163 @@ public void testFailures() throws InterruptedException {
assertEquals("I throw because I choose to.", ex.getMessage());
}

private void populateTimeRangeIndices() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
assertAcked(prepareCreate("log-index-1")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
assertAcked(prepareCreate("log-index-2")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.addMapping("_doc", "timestamp", "type=date", "field1", "type=long"));
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2015-07-08"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2018-07-08"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-03-03"));
reqs.add(client().prepareIndex("log-index-1", "_doc").setSource("timestamp", "2020-09-09"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2019-10-12"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-02-02"));
reqs.add(client().prepareIndex("log-index-2", "_doc").setSource("timestamp", "2020-10-10"));
indexRandom(true, reqs);
ensureGreen("log-index-1", "log-index-2");
client().admin().indices().prepareRefresh("log-index-1", "log-index-2").get();
}

public void testTargetNodeFails() throws Exception {
populateTimeRangeIndices();
try {
final AtomicBoolean failedRequest = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
(handler, request, channel, task) -> {
if (failedRequest.compareAndSet(false, true)) {
channel.sendResponse(new CircuitBreakingException("Simulated", CircuitBreaker.Durability.TRANSIENT));
} else {
handler.messageReceived(request, channel, task);
}
});
}
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertTrue(failedRequest.get());
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("keyword"));
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.clearAllRules();
}
}
}

public void testNoActiveCopy() throws Exception {
assertAcked(prepareCreate("log-index-inactive")
.setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.require._id", "unknown"))
.setWaitForActiveShards(ActiveShardCount.NONE)
.addMapping("_doc", "timestamp", "type=date", "field1", "type=keyword"));
{
final ElasticsearchException ex =
expectThrows(ElasticsearchException.class, () -> client().prepareFieldCaps("log-index-*").setFields("*").get());
assertThat(ex.getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
}
{
populateTimeRangeIndices();
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("long"));

assertThat(response.getFailures(), hasSize(1));
final FieldCapabilitiesFailure failure = response.getFailures().get(0);
assertThat(failure.getIndices(), arrayContainingInAnyOrder("log-index-inactive"));
assertThat(failure.getException().getMessage(), equalTo("index [log-index-inactive] has no active shard copy"));
}
}

private void moveOrCloseShardsOnNodes(String nodeName) throws Exception {
final IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeName);
for (IndexService indexService : indicesService) {
for (IndexShard indexShard : indexService) {
if (randomBoolean()) {
indexShard.close("test", randomBoolean());
} else if (randomBoolean()) {
final ShardId shardId = indexShard.shardId();
final String[] nodeNames = internalCluster().getNodeNames();
final String newNodeName = randomValueOtherThanMany(n -> nodeName.equals(n) == false, () -> randomFrom(nodeNames));
DiscoveryNode fromNode = null;
DiscoveryNode toNode = null;
for (DiscoveryNode node : clusterService().state().nodes()) {
if (node.getName().equals(nodeName)) {
fromNode = node;
}
if (node.getName().equals(newNodeName)) {
toNode = node;
}
}
assertNotNull(fromNode);
assertNotNull(toNode);
client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(shardId.getIndexName(), shardId.id(), fromNode.getId(), toNode.getId()))
.execute().actionGet();
}
}
}
}

public void testRelocation() throws Exception {
populateTimeRangeIndices();
try {
final AtomicBoolean relocated = new AtomicBoolean();
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addRequestHandlingBehavior(TransportFieldCapabilitiesAction.ACTION_NODE_NAME,
(handler, request, channel, task) -> {
if (relocated.compareAndSet(false, true)) {
moveOrCloseShardsOnNodes(node);
}
handler.messageReceived(request, channel, task);
});
}
FieldCapabilitiesRequest request = new FieldCapabilitiesRequest();
request.indices("log-index-*");
request.fields("*");
if (randomBoolean()) {
request.indexFilter(QueryBuilders.rangeQuery("timestamp").gte("2020-01-01"));
}
final FieldCapabilitiesResponse response = client().execute(FieldCapabilitiesAction.INSTANCE, request).actionGet();
assertThat(response.getIndices(), arrayContainingInAnyOrder("log-index-1", "log-index-2"));
assertThat(response.getField("field1"), aMapWithSize(2));
assertThat(response.getField("field1"), hasKey("long"));
assertThat(response.getField("field1"), hasKey("long"));
} finally {
for (String node : internalCluster().getNodeNames()) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.clearAllRules();
}
}
}

private void assertIndices(FieldCapabilitiesResponse response, String... indices) {
assertNotNull(response.getIndices());
Arrays.sort(indices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@
import org.elasticsearch.action.explain.TransportExplainAction;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesIndexAction;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.TransportGetAction;
Expand Down Expand Up @@ -635,8 +634,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(GetScriptContextAction.INSTANCE, TransportGetScriptContextAction.class);
actions.register(GetScriptLanguageAction.INSTANCE, TransportGetScriptLanguageAction.class);

actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class,
TransportFieldCapabilitiesIndexAction.class);
actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class);

actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

public class FieldCapabilitiesIndexRequest extends ActionRequest implements IndicesRequest {

public static final IndicesOptions INDICES_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed();

private final String index;
private final String[] fields;
private final OriginalIndices originalIndices;
private final QueryBuilder indexFilter;
private final long nowInMillis;
private final Map<String, Object> runtimeFields;

private ShardId shardId;
private final ShardId shardId;

// For serialization
FieldCapabilitiesIndexRequest(StreamInput in) throws IOException {
super(in);
shardId = in.readOptionalWriteable(ShardId::new);
index = in.readOptionalString();
if (in.getVersion().before(Version.V_7_16_0)) {
in.readOptionalString(); // index
}
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
Expand All @@ -54,15 +53,15 @@ public class FieldCapabilitiesIndexRequest extends ActionRequest implements Indi
}

FieldCapabilitiesIndexRequest(String[] fields,
String index,
ShardId shardId,
OriginalIndices originalIndices,
QueryBuilder indexFilter,
long nowInMillis,
Map<String, Object> runtimeFields) {
if (fields == null || fields.length == 0) {
throw new IllegalArgumentException("specified fields can't be null or empty");
}
this.index = Objects.requireNonNull(index);
this.shardId = shardId;
this.fields = fields;
this.originalIndices = originalIndices;
this.indexFilter = indexFilter;
Expand All @@ -85,7 +84,7 @@ public IndicesOptions indicesOptions() {
}

public String index() {
return index;
return shardId.getIndexName();
}

public QueryBuilder indexFilter() {
Expand All @@ -104,16 +103,14 @@ public long nowInMillis() {
return nowInMillis;
}

FieldCapabilitiesIndexRequest shardId(ShardId shardId) {
this.shardId = shardId;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(shardId);
out.writeOptionalString(index);
if (out.getVersion().before(Version.V_7_16_0)) {
out.writeOptionalString(shardId.getIndexName());
}
out.writeStringArray(fields);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
OriginalIndices.writeOriginalIndices(originalIndices, out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
import java.util.Map;
import java.util.Objects;

/**
* Response for {@link TransportFieldCapabilitiesIndexAction}.
*/
public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
private final Map<String, IndexFieldCapabilities> responseMap;
Expand Down
Loading

0 comments on commit 6f31965

Please sign in to comment.