diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java new file mode 100644 index 0000000000000..f82c48b944d8c --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java @@ -0,0 +1,185 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Cancellable; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.translog.TranslogStats; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Semaphore; +import java.util.function.Function; + +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; + +/** + * Base class for testing that cancellation works at the REST layer for requests that need to acquire a searcher on one or more shards. + * + * It works by blocking searcher acquisition in order to catch the request mid-execution, and then to check that all the tasks are cancelled + * before they complete normally. + */ +public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase { + + private static final Setting BLOCK_SEARCHER_SETTING + = Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope); + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class); + } + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + void runTest(Request request, String actionPrefix) throws Exception { + + createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build()); + ensureGreen("test"); + + final List searcherBlocks = new ArrayList<>(); + for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { + for (final IndexService indexService : indicesService) { + for (final IndexShard indexShard : indexService) { + final Engine engine = IndexShardTestCase.getEngine(indexShard); + if (engine instanceof SearcherBlockingEngine) { + searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock); + } + } + } + } + assertThat(searcherBlocks, not(empty())); + + final List releasables = new ArrayList<>(); + try { + for (final Semaphore searcherBlock : searcherBlocks) { + searcherBlock.acquire(); + releasables.add(searcherBlock::release); + } + + final PlainActionFuture future = new PlainActionFuture<>(); + logger.info("--> sending request"); + final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + future.onResponse(null); + } + + @Override + public void onFailure(Exception exception) { + future.onFailure(exception); + } + }); + + logger.info("--> waiting for task to start"); + assertBusy(() -> { + final List tasks = client().admin().cluster().prepareListTasks().get().getTasks(); + assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix))); + }); + + logger.info("--> waiting for at least one task to hit a block"); + assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads))); + + logger.info("--> cancelling request"); + cancellable.cancel(); + expectThrows(CancellationException.class, future::actionGet); + + logger.info("--> checking that all tasks are marked as cancelled"); + assertBusy(() -> { + boolean foundTask = false; + for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { + for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) { + if (cancellableTask.getAction().startsWith(actionPrefix)) { + foundTask = true; + assertTrue( + "task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled", + cancellableTask.isCancelled()); + } + } + } + assertTrue("found no cancellable tasks", foundTask); + }); + } finally { + Releasables.close(releasables); + } + + logger.info("--> checking that all tasks have finished"); + assertBusy(() -> { + final List tasks = client().admin().cluster().prepareListTasks().get().getTasks(); + assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix))); + }); + } + + public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin { + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) { + return Optional.of(SearcherBlockingEngine::new); + } + return Optional.empty(); + } + + @Override + public List> getSettings() { + return singletonList(BLOCK_SEARCHER_SETTING); + } + } + + private static class SearcherBlockingEngine extends ReadOnlyEngine { + + final Semaphore searcherBlock = new Semaphore(1); + + SearcherBlockingEngine(EngineConfig config) { + super(config, null, new TranslogStats(), true, Function.identity(), true, false); + } + + @Override + public Searcher acquireSearcher(String source, SearcherScope scope, Function wrapper) throws EngineException { + try { + searcherBlock.acquire(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + searcherBlock.release(); + return super.acquireSearcher(source, scope, wrapper); + } + } + +} diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java index a556d6b8872f0..d2ab3b5a01636 100644 --- a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java @@ -10,178 +10,15 @@ import org.apache.http.client.methods.HttpGet; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Cancellable; import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; -import org.elasticsearch.client.ResponseListener; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineException; -import org.elasticsearch.index.engine.EngineFactory; -import org.elasticsearch.index.engine.ReadOnlyEngine; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTestCase; -import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.EnginePlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.transport.TransportService; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CancellationException; -import java.util.concurrent.Semaphore; -import java.util.function.Function; - -import static java.util.Collections.singletonList; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.not; - -public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase { - - public static final Setting BLOCK_SEARCHER_SETTING - = Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope); - - @Override - protected Collection> nodePlugins() { - return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class); - } - - @Override - protected boolean addMockInternalEngine() { - return false; - } +public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase { public void testIndicesSegmentsRestCancellation() throws Exception { - runTest(new Request(HttpGet.METHOD_NAME, "/_segments")); + runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME); } public void testCatSegmentsRestCancellation() throws Exception { - runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments")); - } - - private void runTest(Request request) throws Exception { - - createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build()); - ensureGreen("test"); - - final List searcherBlocks = new ArrayList<>(); - for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { - for (final IndexService indexService : indicesService) { - for (final IndexShard indexShard : indexService) { - final Engine engine = IndexShardTestCase.getEngine(indexShard); - if (engine instanceof SearcherBlockingEngine) { - searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock); - } - } - } - } - assertThat(searcherBlocks, not(empty())); - - final List releasables = new ArrayList<>(); - try { - for (final Semaphore searcherBlock : searcherBlocks) { - searcherBlock.acquire(); - releasables.add(searcherBlock::release); - } - - final PlainActionFuture future = new PlainActionFuture<>(); - logger.info("--> sending indices segments request"); - final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() { - @Override - public void onSuccess(Response response) { - future.onResponse(null); - } - - @Override - public void onFailure(Exception exception) { - future.onFailure(exception); - } - }); - - logger.info("--> waiting for task to start"); - assertBusy(() -> { - final List tasks = client().admin().cluster().prepareListTasks().get().getTasks(); - assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME))); - }); - - logger.info("--> waiting for at least one task to hit a block"); - assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads))); - - logger.info("--> cancelling request"); - cancellable.cancel(); - expectThrows(CancellationException.class, future::actionGet); - - logger.info("--> checking that all indices segments tasks are marked as cancelled"); - assertBusy(() -> { - boolean foundTask = false; - for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { - for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) { - if (cancellableTask.getAction().startsWith(IndicesSegmentsAction.NAME)) { - foundTask = true; - assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled()); - } - } - } - assertTrue("found no cancellable tasks", foundTask); - }); - } finally { - Releasables.close(releasables); - } - - logger.info("--> checking that all indices segments tasks have finished"); - assertBusy(() -> { - final List tasks = client().admin().cluster().prepareListTasks().get().getTasks(); - assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME))); - }); - } - - public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin { - - @Override - public Optional getEngineFactory(IndexSettings indexSettings) { - if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) { - return Optional.of(SearcherBlockingEngine::new); - } - return Optional.empty(); - } - - @Override - public List> getSettings() { - return singletonList(BLOCK_SEARCHER_SETTING); - } - } - - private static class SearcherBlockingEngine extends ReadOnlyEngine { - - final Semaphore searcherBlock = new Semaphore(1); - - SearcherBlockingEngine(EngineConfig config) { - super(config, null, new TranslogStats(), true, Function.identity(), true, false); - } - - @Override - public Searcher acquireSearcher(String source, SearcherScope scope, Function wrapper) throws EngineException { - try { - searcherBlock.acquire(); - } catch (InterruptedException e) { - throw new AssertionError(e); - } - searcherBlock.release(); - return super.acquireSearcher(source, scope, wrapper); - } + runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME); } } diff --git a/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesStatsRestCancellationIT.java b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesStatsRestCancellationIT.java new file mode 100644 index 0000000000000..cce4899122566 --- /dev/null +++ b/qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesStatsRestCancellationIT.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.apache.http.client.methods.HttpGet; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.client.Request; +import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; +import org.elasticsearch.common.settings.Settings; + +public class IndicesStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // disable internal cluster info service to avoid internal indices stats calls + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false) + .build(); + } + + public void testIndicesStatsRestCancellation() throws Exception { + runTest(new Request(HttpGet.METHOD_NAME, "/_stats"), IndicesStatsAction.NAME); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java index 086f3e1ff8e6e..7b4f9d2eddec1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -12,8 +12,12 @@ import org.elasticsearch.action.support.broadcast.BroadcastRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; /** * A request to get indices level stats. Allow to enable different stats to be returned. @@ -275,4 +279,9 @@ public void writeTo(StreamOutput out) throws IOException { public boolean includeDataStreams() { return true; } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 17e592775f7c7..e856f52ecb102 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -85,6 +86,7 @@ protected IndicesStatsRequest readRequestFrom(StreamInput in) throws IOException @Override protected ShardStats shardOperation(IndicesStatsRequest request, ShardRouting shardRouting, Task task) { + assert task instanceof CancellableTask; IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex()); IndexShard indexShard = indexService.getShard(shardRouting.shardId().id()); // if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java index 093f520facf08..3d84403e63f95 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; @@ -119,7 +120,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indicesStatsRequest.includeUnloadedSegments(request.paramAsBoolean("include_unloaded_segments", false)); } - return channel -> client.admin().indices().stats(indicesStatsRequest, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()) + .admin().indices().stats(indicesStatsRequest, new RestToXContentListener<>(channel)); } @Override diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 11465c40b8323..1b378cdb3943b 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -604,6 +604,7 @@ public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception { ) { simulatedFailures.incrementAndGet(); channel.sendResponse(new ElasticsearchException("simulated")); + return; } } handler.messageReceived(request, channel, task);