-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TransportGetTaskAction: Wait for the task asynchronously #93375
Changes from 11 commits
a3c20f3
4633906
a8eba99
8d79499
5469643
d80ffb0
d39b976
8d0b6a1
0501970
1445188
c0f3883
8b67372
9bd2539
0978a52
7db58ea
395c808
65be7c5
9f6a7bb
55f347f
9073672
b9f62d1
10e8958
4c5a55f
f50e739
b3f415a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ | |||||||||
package org.elasticsearch.action.admin.cluster.node.tasks.get; | ||||||||||
|
||||||||||
import org.elasticsearch.ElasticsearchException; | ||||||||||
import org.elasticsearch.ElasticsearchTimeoutException; | ||||||||||
import org.elasticsearch.ExceptionsHelper; | ||||||||||
import org.elasticsearch.ResourceNotFoundException; | ||||||||||
import org.elasticsearch.action.ActionListener; | ||||||||||
|
@@ -17,15 +18,17 @@ | |||||||||
import org.elasticsearch.action.get.GetResponse; | ||||||||||
import org.elasticsearch.action.support.ActionFilters; | ||||||||||
import org.elasticsearch.action.support.HandledTransportAction; | ||||||||||
import org.elasticsearch.action.support.ListenableActionFuture; | ||||||||||
import org.elasticsearch.client.internal.Client; | ||||||||||
import org.elasticsearch.client.internal.OriginSettingClient; | ||||||||||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||||||||||
import org.elasticsearch.cluster.service.ClusterService; | ||||||||||
import org.elasticsearch.common.inject.Inject; | ||||||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||||||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; | ||||||||||
import org.elasticsearch.common.xcontent.XContentHelper; | ||||||||||
import org.elasticsearch.core.TimeValue; | ||||||||||
import org.elasticsearch.index.IndexNotFoundException; | ||||||||||
import org.elasticsearch.tasks.RemovedTaskListener; | ||||||||||
import org.elasticsearch.tasks.Task; | ||||||||||
import org.elasticsearch.tasks.TaskId; | ||||||||||
import org.elasticsearch.tasks.TaskInfo; | ||||||||||
|
@@ -39,8 +42,9 @@ | |||||||||
|
||||||||||
import java.io.IOException; | ||||||||||
|
||||||||||
import static java.util.Objects.requireNonNullElse; | ||||||||||
import static org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; | ||||||||||
import static org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction.waitForCompletionTimeout; | ||||||||||
import static org.elasticsearch.core.TimeValue.timeValueSeconds; | ||||||||||
|
||||||||||
/** | ||||||||||
* ActionType to get a single task. If the task isn't running then it'll try to request the status from request index. | ||||||||||
|
@@ -53,6 +57,9 @@ | |||||||||
* </ul> | ||||||||||
*/ | ||||||||||
public class TransportGetTaskAction extends HandledTransportAction<GetTaskRequest, GetTaskResponse> { | ||||||||||
|
||||||||||
private static final TimeValue DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT = timeValueSeconds(30); | ||||||||||
|
||||||||||
private final ThreadPool threadPool; | ||||||||||
private final ClusterService clusterService; | ||||||||||
private final TransportService transportService; | ||||||||||
|
@@ -130,19 +137,42 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene | |||||||||
getFinishedTaskFromIndex(thisTask, request, listener); | ||||||||||
} else { | ||||||||||
if (request.getWaitForCompletion()) { | ||||||||||
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads. | ||||||||||
threadPool.generic().execute(new AbstractRunnable() { | ||||||||||
@Override | ||||||||||
protected void doRun() { | ||||||||||
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout())); | ||||||||||
arteam marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener); | ||||||||||
} | ||||||||||
|
||||||||||
@Override | ||||||||||
public void onFailure(Exception e) { | ||||||||||
listener.onFailure(e); | ||||||||||
final ListenableActionFuture<Void> future = new ListenableActionFuture<>(); | ||||||||||
RemovedTaskListener removedTaskListener = task -> { | ||||||||||
if (task.equals(runningTask)) { | ||||||||||
future.onResponse(null); | ||||||||||
} | ||||||||||
}); | ||||||||||
}; | ||||||||||
taskManager.registerRemovedTaskListener(removedTaskListener); | ||||||||||
arteam marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
// Check if the task had finished before we registered the listener, | ||||||||||
// so we wouldn't wait for a task removal event that would never come | ||||||||||
if (taskManager.getTask(request.getTaskId().getId()) == null) { | ||||||||||
future.onResponse(null); | ||||||||||
} | ||||||||||
final ActionListener<Void> waitedForCompletionListener = ActionListener.runBefore( | ||||||||||
ActionListener.wrap( | ||||||||||
v -> waitedForCompletion( | ||||||||||
thisTask, | ||||||||||
request, | ||||||||||
runningTask.taskInfo(clusterService.localNode().getId(), true), | ||||||||||
listener | ||||||||||
), | ||||||||||
listener::onFailure | ||||||||||
), | ||||||||||
() -> taskManager.unregisterRemovedTaskListener(removedTaskListener) | ||||||||||
); | ||||||||||
if (future.isDone()) { | ||||||||||
// The task has already finished, we can run the completion listener in the same thread | ||||||||||
waitedForCompletionListener.onResponse(null); | ||||||||||
} else { | ||||||||||
future.addListener(waitedForCompletionListener); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... like this
Suggested change
We also need to do this for the list-tasks action, and ideally we'd have a test for it there too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah sorry this was wrong, when we wrap it like this the response headers will still propagate. We have to do it like this to drop the response headers:
|
||||||||||
var cancellable = threadPool.schedule( | ||||||||||
() -> future.onFailure(new ElasticsearchTimeoutException("Timed out waiting for completion of task")), | ||||||||||
requireNonNullElse(request.getTimeout(), DEFAULT_WAIT_FOR_COMPLETION_TIMEOUT), | ||||||||||
ThreadPool.Names.SAME | ||||||||||
); | ||||||||||
future.addListener(ActionListener.wrap(cancellable::cancel)); | ||||||||||
} | ||||||||||
} else { | ||||||||||
TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); | ||||||||||
listener.onResponse(new GetTaskResponse(new TaskResult(false, info))); | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like with the latest change
getTask
now inherits warnings from the previous task (not sure why it wasn't the case with a blocking call in the generic pool?) which is technically is a breaking change :(There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect this is because we call the
RemovedTaskListener
(and therefore complete the future) in the thread context of the task that's being removed. We need to fix that, we can't just leak context across responses like this.