Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TransportGetTaskAction: Wait for the task asynchronously #93375

Merged
merged 25 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a3c20f3
TransportGetTaskAction: Wait for the task asynchronously
arteam Jan 30, 2023
4633906
Remove unused method waitForTaskCompletion
arteam Jan 31, 2023
a8eba99
Don't fork waitedForCompletionListener
arteam Jan 31, 2023
8d79499
Remove unused imports
arteam Jan 31, 2023
5469643
Merge branch 'main' into transport-get-task-action-async
elasticmachine Jan 31, 2023
d80ffb0
Update server/src/main/java/org/elasticsearch/action/admin/cluster/no…
arteam Jan 31, 2023
d39b976
Inherit the warning from the previous task
arteam Feb 1, 2023
8d0b6a1
Merge remote-tracking branch 'origin/main' into transport-get-task-ac…
arteam Feb 1, 2023
0501970
Propagate failure if task has been cancelled due timeout
arteam Feb 1, 2023
1445188
Ignore reindex/30_search/Sorting deprecated wait_for_completion false
arteam Feb 1, 2023
c0f3883
Merge branch 'main' into transport-get-task-action-async
elasticmachine Feb 1, 2023
8b67372
Update docs/changelog/93375.yaml
arteam Feb 1, 2023
9bd2539
Update server/src/main/java/org/elasticsearch/action/admin/cluster/no…
arteam Feb 1, 2023
0978a52
Improve naming
arteam Feb 1, 2023
7db58ea
Add missed import
arteam Feb 2, 2023
395c808
Revert keeping warnings for getTask
arteam Feb 2, 2023
65be7c5
Merge remote-tracking branch 'origin/main' into transport-get-task-ac…
arteam Feb 2, 2023
9f6a7bb
Unmute blocking the thread pool
arteam Feb 2, 2023
55f347f
Revert "Unmute blocking the thread pool"
arteam Feb 2, 2023
9073672
Merge remote-tracking branch 'origin/main' into transport-get-task-ac…
arteam Feb 2, 2023
b9f62d1
Merge remote-tracking branch 'origin/main' into transport-get-task-ac…
arteam Feb 2, 2023
10e8958
Merge remote-tracking branch 'origin/main' into transport-get-task-ac…
arteam Feb 6, 2023
4c5a55f
Revert
DaveCTurner Feb 6, 2023
f50e739
Stop preserving response headers
DaveCTurner Feb 6, 2023
b3f415a
Remove bogus test comment
DaveCTurner Feb 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93375.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93375
summary: '`TransportGetTaskAction:` Wait for the task asynchronously'
area: Task Management
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@ public void testWaitForCompletion() throws Exception {
assertNull(threadContext.getResponseHeaders().get(TestTransportAction.HEADER_NAME));
}));

// briefly fill up the generic pool so that (a) we know the wait has started and (b) we know it's not blocking
// flushThreadPool(threadPool, ThreadPool.Names.GENERIC); // TODO it _is_ blocking right now!!, unmute this in #93375

assertFalse(listWaitFuture.isDone());
assertFalse(testActionFuture.isDone());
barrier.await(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@
package org.elasticsearch.action.admin.cluster.node.tasks.get;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.ListenableActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
Expand All @@ -39,8 +43,9 @@

import java.io.IOException;

import static java.util.Objects.requireNonNullElse;
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout;
import static org.elasticsearch.core.TimeValue.timeValueSeconds;

/**
* ActionType to get a single task. If the task isn't running then it'll try to request the status from request index.
Expand All @@ -53,6 +58,9 @@
* </ul>
*/
public class TransportGetTaskAction extends HandledTransportAction<GetTaskRequest, GetTaskResponse> {

private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30);

private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TransportService transportService;
Expand Down Expand Up @@ -130,19 +138,47 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
getFinishedTaskFromIndex(thisTask, request, listener);
} else {
if (request.getWaitForCompletion()) {
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
arteam marked this conversation as resolved.
Show resolved Hide resolved
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
};
taskManager.registerRemovedTaskListener(removedTaskListener);
arteam marked this conversation as resolved.
Show resolved Hide resolved
// Check if the task had finished before we registered the listener, so we wouldn't wait
// for an event that would never come
if (taskManager.getTask(request.getTaskId().getId()) == null) {
future.onResponse(null);
}
final ActionListener<Void> waitedForCompletionListener = ActionListener.runBefore(
ActionListener.wrap(
v -> waitedForCompletion(
thisTask,
request,
runningTask.taskInfo(clusterService.localNode().getId(), true),
listener
),
listener::onFailure
),
() -> taskManager.unregisterRemovedTaskListener(removedTaskListener)
);
if (future.isDone()) {
// The task has already finished, we can run the completion listener in the same thread
waitedForCompletionListener.onResponse(null);
} else {
future.addListener(
new ContextPreservingActionListener<>(
threadPool.getThreadContext().newRestorableContext(false),
waitedForCompletionListener
)
);
var failByTimeout = threadPool.schedule(
() -> future.onFailure(new ElasticsearchTimeoutException("Timed out waiting for completion of task")),
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT),
ThreadPool.Names.SAME
);
future.addListener(ActionListener.wrap(failByTimeout::cancel));
}
} else {
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true);
listener.onResponse(new GetTaskResponse(new TaskResult(false, info)));
Expand Down
19 changes: 0 additions & 19 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
Expand Down Expand Up @@ -547,23 +545,6 @@ public void applyClusterState(ClusterChangedEvent event) {
lastDiscoveryNodes = event.state().getNodes();
}

/**
* Blocks the calling thread, waiting for the task to vanish from the TaskManager.
*/
public void waitForTaskCompletion(Task task, long untilInNanos) {
while (System.nanoTime() - untilInNanos < 0) {
if (getTask(task.getId()) == null) {
return;
}
try {
Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
} catch (InterruptedException e) {
throw new ElasticsearchException("Interrupted waiting for completion of [{}]", e, task);
}
}
throw new ElasticsearchTimeoutException("Timed out waiting for completion of [{}]", task);
}

private static class CancellableTaskHolder {
private final CancellableTask task;
private boolean finished = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ public Task unregister(Task task) {
return removedTask;
}

@Override
public void waitForTaskCompletion(Task task, long untilInNanos) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.waitForTaskCompletion(task);
} catch (Exception e) {
logger.warn(
() -> format("failed to notify task manager listener about waitForTaskCompletion the task with id %s", task.getId()),
e
);
}
}
super.waitForTaskCompletion(task, untilInNanos);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
for (MockTaskManagerListener listener : listeners) {
Expand Down