Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make indices stats requests cancellable #69174

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

/**
* Base class for testing that cancellation works at the REST layer for requests that need to acquire a searcher on one or more shards.
*
* It works by blocking searcher acquisition in order to catch the request mid-execution, and then to check that all the tasks are cancelled
* before they complete normally.
*/
public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeTestCase {

private static final Setting<Boolean> BLOCK_SEARCHER_SETTING
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

void runTest(Request request, String actionPrefix) throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there's a lot of lines changed here, all I did was generalise this method to wait on different action prefixes and then extract a superclass.


createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
ensureGreen("test");

final List<Semaphore> searcherBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof SearcherBlockingEngine) {
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
}
}
}
}
assertThat(searcherBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore searcherBlock : searcherBlocks) {
searcherBlock.acquire();
releasables.add(searcherBlock::release);
}

final PlainActionFuture<Void> future = new PlainActionFuture<>();
logger.info("--> sending request");
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.onResponse(null);
}

@Override
public void onFailure(Exception exception) {
future.onFailure(exception);
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(actionPrefix)));
});

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

logger.info("--> cancelling request");
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);

logger.info("--> checking that all tasks are marked as cancelled");
assertBusy(() -> {
boolean foundTask = false;
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(actionPrefix)) {
foundTask = true;
assertTrue(
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
cancellableTask.isCancelled());
}
}
}
assertTrue("found no cancellable tasks", foundTask);
});
} finally {
Releasables.close(releasables);
}

logger.info("--> checking that all tasks have finished");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(actionPrefix)));
});
}

public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
return Optional.of(SearcherBlockingEngine::new);
}
return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(BLOCK_SEARCHER_SETTING);
}
}

private static class SearcherBlockingEngine extends ReadOnlyEngine {

final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
}

@Override
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
try {
searcherBlock.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
searcherBlock.release();
return super.acquireSearcher(source, scope, wrapper);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,178 +10,14 @@

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;

public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {

public static final Setting<Boolean> BLOCK_SEARCHER_SETTING
= Setting.boolSetting("index.block_searcher", false, Setting.Property.IndexScope);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), SearcherBlockingPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
public void testIndicesSegmentsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"));
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME);
}

public void testCatSegmentsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"));
}

private void runTest(Request request) throws Exception {

createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
ensureGreen("test");

final List<Semaphore> searcherBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
for (final IndexShard indexShard : indexService) {
final Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof SearcherBlockingEngine) {
searcherBlocks.add(((SearcherBlockingEngine) engine).searcherBlock);
}
}
}
}
assertThat(searcherBlocks, not(empty()));

final List<Releasable> releasables = new ArrayList<>();
try {
for (final Semaphore searcherBlock : searcherBlocks) {
searcherBlock.acquire();
releasables.add(searcherBlock::release);
}

final PlainActionFuture<Void> future = new PlainActionFuture<>();
logger.info("--> sending indices segments request");
final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(Response response) {
future.onResponse(null);
}

@Override
public void onFailure(Exception exception) {
future.onFailure(exception);
}
});

logger.info("--> waiting for task to start");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().anyMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
});

logger.info("--> waiting for at least one task to hit a block");
assertBusy(() -> assertTrue(searcherBlocks.stream().anyMatch(Semaphore::hasQueuedThreads)));

logger.info("--> cancelling request");
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);

logger.info("--> checking that all indices segments tasks are marked as cancelled");
assertBusy(() -> {
boolean foundTask = false;
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
for (CancellableTask cancellableTask : transportService.getTaskManager().getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(IndicesSegmentsAction.NAME)) {
foundTask = true;
assertTrue("task " + cancellableTask.getId() + " not cancelled", cancellableTask.isCancelled());
}
}
}
assertTrue("found no cancellable tasks", foundTask);
});
} finally {
Releasables.close(releasables);
}

logger.info("--> checking that all indices segments tasks have finished");
assertBusy(() -> {
final List<TaskInfo> tasks = client().admin().cluster().prepareListTasks().get().getTasks();
assertTrue(tasks.toString(), tasks.stream().noneMatch(t -> t.getAction().startsWith(IndicesSegmentsAction.NAME)));
});
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME);
}

public static class SearcherBlockingPlugin extends Plugin implements EnginePlugin {

@Override
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (BLOCK_SEARCHER_SETTING.get(indexSettings.getSettings())) {
return Optional.of(SearcherBlockingEngine::new);
}
return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return singletonList(BLOCK_SEARCHER_SETTING);
}
}

private static class SearcherBlockingEngine extends ReadOnlyEngine {

final Semaphore searcherBlock = new Semaphore(1);

SearcherBlockingEngine(EngineConfig config) {
super(config, null, new TranslogStats(), true, Function.identity(), true);
}

@Override
public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
try {
searcherBlock.acquire();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
searcherBlock.release();
return super.acquireSearcher(source, scope, wrapper);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http;

import org.apache.http.client.methods.HttpGet;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;

public class IndicesStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
// disable internal cluster info service to avoid internal indices stats calls
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.getKey(), false)
.build();
}

public void testIndicesStatsRestCancellation() throws Exception {
runTest(new Request(HttpGet.METHOD_NAME, "/_stats"), IndicesStatsAction.NAME);
}
}
Loading