From 423a0a5b67a02e5a8540bfd5612e0577da4ef0ff Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Mar 2022 16:22:46 -0500 Subject: [PATCH] Relax error handling in transport disk usage API (#84774) Relax the failure handling to cover other exceptions such as BroadcastShardOperationFailedException. --- docs/changelog/84774.yaml | 5 ++ .../diskusage/IndexDiskUsageAnalyzerIT.java | 87 +++++++++++++++++++ .../TransportAnalyzeIndexDiskUsageAction.java | 7 +- 3 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/84774.yaml diff --git a/docs/changelog/84774.yaml b/docs/changelog/84774.yaml new file mode 100644 index 0000000000000..ab4aabc9720a7 --- /dev/null +++ b/docs/changelog/84774.yaml @@ -0,0 +1,5 @@ +pr: 84774 +summary: Increase store ref before analyzing disk usage +area: Search +type: bug +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java index 5ce5cb49af4ac..c1fafcb530595 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/diskusage/IndexDiskUsageAnalyzerIT.java @@ -12,15 +12,65 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentFactory; +import org.junit.Before; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; public class IndexDiskUsageAnalyzerIT extends ESIntegTestCase { + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), EngineTestPlugin.class); + } + + private static final Set failOnFlushShards = Sets.newConcurrentHashSet(); + + public static class EngineTestPlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(config -> new InternalEngine(config) { + @Override + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + final ShardId shardId = config.getShardId(); + if (failOnFlushShards.contains(shardId)) { + throw new EngineException(shardId, "simulated IO"); + } + super.flush(force, waitIfOngoing); + } + }); + } + } + + @Before + public void resetFailOnFlush() throws Exception { + failOnFlushShards.clear(); + } + public void testSimple() throws Exception { final XContentBuilder mapping = XContentFactory.jsonBuilder(); mapping.startObject(); @@ -98,6 +148,43 @@ public void testSimple() throws Exception { assertMetadataFields(stats); } + public void testFailOnFlush() throws Exception { + final String indexName = "test-index"; + int numberOfShards = between(1, 5); + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)) + ) + .get(); + ensureGreen(indexName); + int numDocs = randomIntBetween(1, 10); + for (int i = 0; i < numDocs; i++) { + int value = randomIntBetween(1, 10); + final XContentBuilder doc = XContentFactory.jsonBuilder() + .startObject() + .field("english_text", English.intToEnglish(value)) + .field("value", value) + .endObject(); + client().prepareIndex(indexName).setId("id-" + i).setSource(doc).get(); + } + Index index = clusterService().state().metadata().index(indexName).getIndex(); + List failedShards = randomSubsetOf( + between(1, numberOfShards), + IntStream.range(0, numberOfShards).mapToObj(n -> new ShardId(index, n)).toList() + ); + failOnFlushShards.addAll(failedShards); + AnalyzeIndexDiskUsageResponse resp = client().execute( + AnalyzeIndexDiskUsageAction.INSTANCE, + new AnalyzeIndexDiskUsageRequest(new String[] { indexName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true) + ).actionGet(); + assertThat(resp.getTotalShards(), equalTo(numberOfShards)); + assertThat(resp.getFailedShards(), equalTo(failedShards.size())); + } + void assertMetadataFields(IndexDiskUsageStats stats) { final IndexDiskUsageStats.PerFieldDiskUsage sourceField = stats.getFields().get("_source"); assertThat(sourceField.getInvertedIndexBytes(), equalTo(0L)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java index b083f1b221a1e..dabba637ea799 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/diskusage/TransportAnalyzeIndexDiskUsageAction.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.indices.diskusage; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; @@ -109,8 +110,10 @@ protected AnalyzeIndexDiskUsageResponse newResponse( if (r instanceof AnalyzeDiskUsageShardResponse resp) { ++successfulShards; combined.compute(resp.getIndex(), (k, v) -> v == null ? resp.stats : v.add(resp.stats)); - } else if (r instanceof DefaultShardOperationFailedException) { - shardFailures.add((DefaultShardOperationFailedException) r); + } else if (r instanceof DefaultShardOperationFailedException e) { + shardFailures.add(e); + } else if (r instanceof Exception e) { + shardFailures.add(new DefaultShardOperationFailedException(ExceptionsHelper.convertToElastic(e))); } else { assert false : "unknown response [" + r + "]"; throw new IllegalStateException("unknown response [" + r + "]");