-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cancel search task on connection close #43332
Changes from 25 commits
1f45077
6697ff6
9361f23
71f424d
f71dd5e
4dc5f9d
9b101e5
0654c91
05a428d
2157230
f6d7489
d581ab6
40ae005
07f7cd3
468abb8
b022f45
2e6e0a2
21f897f
591cd54
646eb8a
7b9621c
bfd154d
f74d087
9bfd490
75bf6c3
868e01a
2d886e6
98e74b5
8577658
adc710a
6f83fd8
862c111
b4a7d5d
83497ab
e2a75b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.elasticsearch.http; | ||
|
||
import org.apache.http.HttpHost; | ||
import org.apache.http.HttpResponse; | ||
import org.apache.http.client.methods.HttpPost; | ||
import org.apache.http.entity.ContentType; | ||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; | ||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; | ||
import org.apache.http.nio.entity.NStringEntity; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.lucene.util.SetOnce; | ||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; | ||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; | ||
import org.elasticsearch.action.bulk.BulkRequestBuilder; | ||
import org.elasticsearch.action.search.SearchAction; | ||
import org.elasticsearch.action.support.WriteRequest; | ||
import org.elasticsearch.common.Strings; | ||
import org.elasticsearch.common.network.NetworkAddress; | ||
import org.elasticsearch.common.transport.TransportAddress; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.plugins.PluginsService; | ||
import org.elasticsearch.script.MockScriptPlugin; | ||
import org.elasticsearch.script.Script; | ||
import org.elasticsearch.script.ScriptType; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.search.lookup.LeafFieldsLookup; | ||
import org.elasticsearch.tasks.CancellableTask; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.tasks.TaskInfo; | ||
import org.elasticsearch.tasks.TaskManager; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.net.InetSocketAddress; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CancellationException; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Function; | ||
|
||
import static org.elasticsearch.http.SearchHttpCancellationIT.ScriptedBlockPlugin.SCRIPT_NAME; | ||
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; | ||
import static org.hamcrest.Matchers.greaterThan; | ||
import static org.hamcrest.Matchers.instanceOf; | ||
|
||
public class SearchHttpCancellationIT extends HttpSmokeTestCase { | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
List<Class<? extends Plugin>> plugins = new ArrayList<>(); | ||
plugins.add(ScriptedBlockPlugin.class); | ||
plugins.addAll(super.nodePlugins()); | ||
return plugins; | ||
} | ||
|
||
public void testAutomaticCancellationDuringQueryPhase() throws Exception { | ||
List<ScriptedBlockPlugin> plugins = initBlockFactory(); | ||
indexTestData(); | ||
|
||
HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create(); | ||
|
||
try (CloseableHttpAsyncClient httpClient = clientBuilder.build()) { | ||
httpClient.start(); | ||
|
||
List<HttpHost> hosts = new ArrayList<>(); | ||
Map<String, String> nodeIdToName = new HashMap<>(); | ||
readNodesInfo(hosts, nodeIdToName); | ||
|
||
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery( | ||
new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))); | ||
|
||
HttpPost httpPost = new HttpPost("/test/_search"); | ||
httpPost.setEntity(new NStringEntity(Strings.toString(searchSource), ContentType.APPLICATION_JSON)); | ||
|
||
Future<HttpResponse> future = httpClient.execute(randomFrom(hosts), httpPost, null); | ||
awaitForBlock(plugins); | ||
|
||
httpPost.abort(); | ||
ensureSearchTaskIsCancelled(nodeIdToName::get); | ||
|
||
disableBlocks(plugins); | ||
expectThrows(CancellationException.class, future::get); | ||
} | ||
} | ||
|
||
public void testAutomaticCancellationDuringFetchPhase() throws Exception { | ||
List<ScriptedBlockPlugin> plugins = initBlockFactory(); | ||
indexTestData(); | ||
|
||
HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create(); | ||
|
||
try (CloseableHttpAsyncClient httpClient = clientBuilder.build()) { | ||
httpClient.start(); | ||
|
||
List<HttpHost> hosts = new ArrayList<>(); | ||
Map<String, String> nodeIdToName = new HashMap<>(); | ||
readNodesInfo(hosts, nodeIdToName); | ||
|
||
SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field", | ||
new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())); | ||
|
||
HttpPost httpPost = new HttpPost("/test/_search"); | ||
httpPost.setEntity(new NStringEntity(Strings.toString(searchSource), ContentType.APPLICATION_JSON)); | ||
|
||
Future<HttpResponse> future = httpClient.execute(randomFrom(hosts), httpPost, null); | ||
awaitForBlock(plugins); | ||
|
||
httpPost.abort(); | ||
ensureSearchTaskIsCancelled(nodeIdToName::get); | ||
|
||
disableBlocks(plugins); | ||
expectThrows(CancellationException.class, future::get); | ||
} | ||
} | ||
|
||
private static void readNodesInfo(List<HttpHost> hosts, Map<String, String> nodeIdToName) { | ||
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get(); | ||
assertFalse(nodesInfoResponse.hasFailures()); | ||
for (NodeInfo node : nodesInfoResponse.getNodes()) { | ||
if (node.getHttp() != null) { | ||
TransportAddress publishAddress = node.getHttp().address().publishAddress(); | ||
InetSocketAddress address = publishAddress.address(); | ||
hosts.add(new HttpHost(NetworkAddress.format(address.getAddress()), address.getPort(), "http")); | ||
} | ||
nodeIdToName.put(node.getNode().getId(), node.getNode().getName()); | ||
} | ||
} | ||
|
||
private static void ensureSearchTaskIsCancelled(Function<String, String> nodeIdToName) { | ||
SetOnce<TaskInfo> searchTask = new SetOnce<>(); | ||
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get(); | ||
for (TaskInfo task : listTasksResponse.getTasks()) { | ||
if (task.getAction().equals(SearchAction.NAME)) { | ||
searchTask.set(task); | ||
} | ||
} | ||
assertNotNull(searchTask.get()); | ||
TaskId taskId = searchTask.get().getTaskId(); | ||
String nodeName = nodeIdToName.apply(taskId.getNodeId()); | ||
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager(); | ||
Task task = taskManager.getTask(taskId.getId()); | ||
assertThat(task, instanceOf(CancellableTask.class)); | ||
assertTrue(((CancellableTask)task).isCancelled()); | ||
} | ||
|
||
private void indexTestData() { | ||
for (int i = 0; i < 5; i++) { | ||
// Make sure we have a few segments | ||
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); | ||
for (int j = 0; j < 20; j++) { | ||
bulkRequestBuilder.add(client().prepareIndex("test", "type", Integer.toString(i * 5 + j)).setSource("field", "value")); | ||
} | ||
assertNoFailures(bulkRequestBuilder.get()); | ||
} | ||
} | ||
|
||
private List<ScriptedBlockPlugin> initBlockFactory() { | ||
List<ScriptedBlockPlugin> plugins = new ArrayList<>(); | ||
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) { | ||
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class)); | ||
} | ||
for (ScriptedBlockPlugin plugin : plugins) { | ||
plugin.reset(); | ||
plugin.enableBlock(); | ||
} | ||
return plugins; | ||
} | ||
|
||
private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception { | ||
int numberOfShards = getNumShards("test").numPrimaries; | ||
assertBusy(() -> { | ||
int numberOfBlockedPlugins = 0; | ||
for (ScriptedBlockPlugin plugin : plugins) { | ||
numberOfBlockedPlugins += plugin.hits.get(); | ||
} | ||
logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards); | ||
assertThat(numberOfBlockedPlugins, greaterThan(0)); | ||
}); | ||
} | ||
|
||
private void disableBlocks(List<ScriptedBlockPlugin> plugins) throws Exception { | ||
for (ScriptedBlockPlugin plugin : plugins) { | ||
plugin.disableBlock(); | ||
} | ||
} | ||
|
||
public static class ScriptedBlockPlugin extends MockScriptPlugin { | ||
static final String SCRIPT_NAME = "search_block"; | ||
|
||
private final AtomicInteger hits = new AtomicInteger(); | ||
|
||
private final AtomicBoolean shouldBlock = new AtomicBoolean(true); | ||
|
||
void reset() { | ||
hits.set(0); | ||
} | ||
|
||
void disableBlock() { | ||
shouldBlock.set(false); | ||
} | ||
|
||
void enableBlock() { | ||
shouldBlock.set(true); | ||
} | ||
|
||
@Override | ||
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() { | ||
return Collections.singletonMap(SCRIPT_NAME, params -> { | ||
LeafFieldsLookup fieldsLookup = (LeafFieldsLookup) params.get("_fields"); | ||
LogManager.getLogger(SearchHttpCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id")); | ||
hits.incrementAndGet(); | ||
try { | ||
awaitBusy(() -> shouldBlock.get() == false); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
return true; | ||
}); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.rest.action.search; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRequest; | ||
import org.elasticsearch.action.ActionResponse; | ||
import org.elasticsearch.action.ActionType; | ||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.client.node.NodeClient; | ||
import org.elasticsearch.http.HttpChannel; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.tasks.TaskId; | ||
|
||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
/** | ||
* This class executes a request and associates the corresponding {@link Task} with the {@link HttpChannel} that it was originated from, | ||
* so that the tasks associated with a certain channel get cancelled when the underlying connection gets closed. | ||
*/ | ||
public final class HttpChannelTaskHandler { | ||
|
||
private static final HttpChannelTaskHandler INSTANCE = new HttpChannelTaskHandler(); | ||
|
||
final Map<HttpChannel, CloseListener> httpChannels = new ConcurrentHashMap<>(); | ||
|
||
private HttpChannelTaskHandler() { | ||
} | ||
|
||
public static HttpChannelTaskHandler get() { | ||
return INSTANCE; | ||
} | ||
|
||
<Response extends ActionResponse> void execute(NodeClient client, HttpChannel httpChannel, ActionRequest request, | ||
ActionType<Response> actionType, ActionListener<Response> listener) { | ||
|
||
CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> new CloseListener(client)); | ||
TaskHolder taskHolder = new TaskHolder(); | ||
Task task = client.executeLocally(actionType, request, | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(Response searchResponse) { | ||
try { | ||
closeListener.unregisterTask(taskHolder); | ||
} finally { | ||
listener.onResponse(searchResponse); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
closeListener.unregisterTask(taskHolder); | ||
} finally { | ||
listener.onFailure(e); | ||
} | ||
} | ||
}); | ||
closeListener.registerTask(taskHolder, new TaskId(client.getLocalNodeId(), task.getId())); | ||
closeListener.maybeRegisterChannel(httpChannel); | ||
} | ||
|
||
public int getNumChannels() { | ||
return httpChannels.size(); | ||
} | ||
|
||
final class CloseListener implements ActionListener<Void> { | ||
private final Client client; | ||
private final AtomicReference<HttpChannel> channel = new AtomicReference<>(); | ||
final Set<TaskId> taskIds = new HashSet<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this be private too? |
||
|
||
CloseListener(Client client) { | ||
this.client = client; | ||
} | ||
|
||
void maybeRegisterChannel(HttpChannel httpChannel) { | ||
if (channel.compareAndSet(null, httpChannel)) { | ||
//In case the channel is already closed when we register the listener, the listener will be immediately executed which will | ||
//remove the channel from the map straight-away. That is why we first create the CloseListener and later we associate it | ||
//with the channel. This guarantees that the close listener is already in the map when the it gets registered to its | ||
//corresponding channel, hence it is always found in the map when it gets invoked if the channel gets closed. | ||
httpChannel.addCloseListener(this); | ||
} | ||
} | ||
|
||
synchronized void registerTask(TaskHolder taskHolder, TaskId taskId) { | ||
taskHolder.taskId = taskId; | ||
if (taskHolder.completed == false) { | ||
this.taskIds.add(taskId); | ||
} | ||
} | ||
|
||
synchronized void unregisterTask(TaskHolder taskHolder) { | ||
if (taskHolder.taskId != null) { | ||
this.taskIds.remove(taskHolder.taskId); | ||
} | ||
taskHolder.completed = true; | ||
} | ||
|
||
@Override | ||
public synchronized void onResponse(Void aVoid) { | ||
//When the channel gets closed it won't be reused: we can remove it from the map and forget about it. | ||
httpChannels.remove(channel.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we assert that it was in the map? |
||
for (TaskId previousTaskId : taskIds) { | ||
//TODO what thread context should this be run on? | ||
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should run under the system context |
||
cancelTasksRequest.setTaskId(previousTaskId); | ||
//We don't wait for cancel tasks to come back. Task cancellation is just best effort. | ||
//Note that cancel tasks fails if the user sending the search request does not have the permissions to call it. | ||
client.admin().cluster().cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> {}, e -> {})); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
onResponse(null); | ||
} | ||
} | ||
|
||
private static class TaskHolder { | ||
private TaskId taskId; | ||
private boolean completed = false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't you just make the
INSTANCE
public?