Skip to content

Commit

Permalink
Different wait for cancelled tasks (#94992)
Browse files Browse the repository at this point in the history
* Different wait for cancelled tasks
* Reschedule cancellable task. So it gives space in the thread pool for cancellation requests

Fixes #94989
  • Loading branch information
kingherc authored Apr 4, 2023
1 parent 7a78203 commit beb7e53
Showing 1 changed file with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
Expand Down Expand Up @@ -45,7 +44,6 @@
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -383,16 +381,12 @@ public void testChildrenTasksCancelledOnTimeout() throws Exception {
ActionFuture<TestResponse> rootTaskFuture = client().execute(TransportTestAction.ACTION, rootRequest);
allowEntireRequest(rootRequest);
waitForRootTask(rootTaskFuture, true);
assertBusy(() -> {
for (DiscoveryNode node : nodes) {
TransportService transportService = internalCluster().getInstance(TransportService.class, node.getName());
for (ThreadPoolStats.Stats stat : transportService.getThreadPool().stats()) {
assertEquals(0, stat.getActive());
assertEquals(0, stat.getQueue());
}
}
}, 60L, TimeUnit.SECONDS);
ensureBansAndCancellationsConsistency();

// Make sure all descendent requests have completed
for (TestRequest subRequest : rootRequest.descendants()) {
safeAwait(completedLatches.get(subRequest));
}
}

static TaskId getRootTaskId(TestRequest request) throws Exception {
Expand Down Expand Up @@ -536,6 +530,30 @@ public TransportTestAction(TransportService transportService, NodeClient client,
this.client = client;
}

private void schedule(Task task, TestRequest request, TimeValue delay, ActionListener<TestResponse> listener) {
transportService.getThreadPool().schedule(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
assertTrue(beforeExecuteLatches.get(request).await(60, TimeUnit.SECONDS));
if (request.timeout) {
// Repeat work until cancelled
if (((CancellableTask) task).isCancelled() == false) {
schedule(task, request, TimeValue.timeValueMillis(10), listener);
return;
}
} else {
((CancellableTask) task).ensureNotCancelled();
}
listener.onResponse(new TestResponse());
}
}, delay, ThreadPool.Names.GENERIC);
}

@Override
protected void doExecute(Task task, TestRequest request, ActionListener<TestResponse> listener) {
arrivedLatches.get(request).countDown();
Expand All @@ -544,20 +562,7 @@ protected void doExecute(Task task, TestRequest request, ActionListener<TestResp
subRequests.size() + 1,
listener.map(r -> new TestResponse())
);
transportService.getThreadPool().generic().execute(ActionRunnable.supply(groupedListener, () -> {
assertTrue(beforeExecuteLatches.get(request).await(60, TimeUnit.SECONDS));
if (request.timeout) {
// Simulate working until cancelled
while (((CancellableTask) task).isCancelled() == false) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {}
}
} else {
((CancellableTask) task).ensureNotCancelled();
}
return new TestResponse();
}));
schedule(task, request, TimeValue.ZERO, groupedListener);
for (TestRequest subRequest : subRequests) {
TaskId parentTaskId = new TaskId(client.getLocalNodeId(), task.getId());
startSubTask(parentTaskId, subRequest, groupedListener);
Expand Down

0 comments on commit beb7e53

Please sign in to comment.