From 7bd5613e2161fe69c6f9bc5c94b50a1b58863c89 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Mon, 6 Feb 2023 16:32:38 +0100 Subject: [PATCH] TransportGetTaskAction: Wait for the task asynchronously (#93375) Wait for the requested task asynchronously in a similar fashion to TransportListTaskAction from #90977 See #90977 --------- Co-authored-by: David Turner --- docs/changelog/93375.yaml | 5 ++ .../admin/cluster/tasks/ListTasksIT.java | 3 - .../tasks/get/TransportGetTaskAction.java | 64 +++++++++++++++---- .../org/elasticsearch/tasks/TaskManager.java | 19 ------ .../test/tasks/MockTaskManager.java | 15 ----- 5 files changed, 55 insertions(+), 51 deletions(-) create mode 100644 docs/changelog/93375.yaml diff --git a/docs/changelog/93375.yaml b/docs/changelog/93375.yaml new file mode 100644 index 0000000000000..63114927df96a --- /dev/null +++ b/docs/changelog/93375.yaml @@ -0,0 +1,5 @@ +pr: 93375 +summary: '`TransportGetTaskAction:` Wait for the task asynchronously' +area: Task Management +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/ListTasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/ListTasksIT.java index e3b12b7b9ebb4..f4b1903bba671 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/ListTasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/ListTasksIT.java @@ -127,9 +127,6 @@ public void testWaitForCompletion() throws Exception { assertNull(threadContext.getResponseHeaders().get(TestTransportAction.HEADER_NAME)); })); - // briefly fill up the generic pool so that (a) we know the wait has started and (b) we know it's not blocking - // flushThreadPool(threadPool, ThreadPool.Names.GENERIC); // TODO it _is_ blocking right now!!, unmute this in #93375 - assertFalse(listWaitFuture.isDone()); assertFalse(testActionFuture.isDone()); barrier.await(10, TimeUnit.SECONDS); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 32ca0a59a0b1b..cf623c37f721f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks.get; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; @@ -16,16 +17,19 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; @@ -39,8 +43,9 @@ import java.io.IOException; +import static java.util.Objects.requireNonNullElse; import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; -import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout; +import static org.elasticsearch.core.TimeValue.timeValueSeconds; /** * ActionType to get a single task. If the task isn't running then it'll try to request the status from request index. @@ -53,6 +58,9 @@ * */ public class TransportGetTaskAction extends HandledTransportAction { + + private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30); + private final ThreadPool threadPool; private final ClusterService clusterService; private final TransportService transportService; @@ -130,19 +138,47 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene getFinishedTaskFromIndex(thisTask, request, listener); } else { if (request.getWaitForCompletion()) { - // Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads. - threadPool.generic().execute(new AbstractRunnable() { - @Override - protected void doRun() { - taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout())); - waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener); + final ListenableActionFuture future = new ListenableActionFuture<>(); + RemovedTaskListener removedTaskListener = task -> { + if (task.equals(runningTask)) { + future.onResponse(null); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + }; + taskManager.registerRemovedTaskListener(removedTaskListener); + // Check if the task had finished before we registered the listener, so we wouldn't wait + // for an event that would never come + if (taskManager.getTask(request.getTaskId().getId()) == null) { + future.onResponse(null); + } + final ActionListener waitedForCompletionListener = ActionListener.runBefore( + ActionListener.wrap( + v -> waitedForCompletion( + thisTask, + request, + runningTask.taskInfo(clusterService.localNode().getId(), true), + listener + ), + listener::onFailure + ), + () -> taskManager.unregisterRemovedTaskListener(removedTaskListener) + ); + if (future.isDone()) { + // The task has already finished, we can run the completion listener in the same thread + waitedForCompletionListener.onResponse(null); + } else { + future.addListener( + new ContextPreservingActionListener<>( + threadPool.getThreadContext().newRestorableContext(false), + waitedForCompletionListener + ) + ); + var failByTimeout = threadPool.schedule( + () -> future.onFailure(new ElasticsearchTimeoutException("Timed out waiting for completion of task")), + requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT), + ThreadPool.Names.SAME + ); + future.addListener(ActionListener.wrap(failByTimeout::cancel)); + } } else { TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); listener.onResponse(new GetTaskResponse(new TaskResult(false, info))); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index eded3b1cfb67c..8fc38e520d0d5 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -12,8 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Assertions; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; @@ -547,23 +545,6 @@ public void applyClusterState(ClusterChangedEvent event) { lastDiscoveryNodes = event.state().getNodes(); } - /** - * Blocks the calling thread, waiting for the task to vanish from the TaskManager. - */ - public void waitForTaskCompletion(Task task, long untilInNanos) { - while (System.nanoTime() - untilInNanos < 0) { - if (getTask(task.getId()) == null) { - return; - } - try { - Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis()); - } catch (InterruptedException e) { - throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task); - } - } - throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task); - } - private static class CancellableTaskHolder { private final CancellableTask task; private boolean finished = false; diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index d9dc13c2a7ce6..0e89df35eca7a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -78,21 +78,6 @@ public Task unregister(Task task) { return removedTask; } - @Override - public void waitForTaskCompletion(Task task, long untilInNanos) { - for (MockTaskManagerListener listener : listeners) { - try { - listener.waitForTaskCompletion(task); - } catch (Exception e) { - logger.warn( - () -> format("failed to notify task manager listener about waitForTaskCompletion the task with id %s", task.getId()), - e - ); - } - } - super.waitForTaskCompletion(task, untilInNanos); - } - @Override public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) { for (MockTaskManagerListener listener : listeners) {