Skip to content

Commit

Permalink
do not fork to generic thread pool for non-forking response handler w…
Browse files Browse the repository at this point in the history
…hen handling exception
  • Loading branch information
ywangd committed Oct 10, 2024
1 parent d93212d commit 37d520c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void onResponse(@Nullable T result) {

@Override
public void onFailure(Exception e) {
assert ThreadPool.Names.GENERIC.equals(EsExecutors.executorName(Thread.currentThread())) || assertCompleteAllowed();
assert assertCompleteAllowed();
if (sync.setException(Objects.requireNonNull(e))) {
done(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1059,8 +1059,9 @@ private Executor getInternalSendExceptionExecutor(Executor handlerExecutor) {
if (lifecycle.stoppedOrClosed()) {
// too late to try and dispatch anywhere else, let's just use the calling thread
return EsExecutors.DIRECT_EXECUTOR_SERVICE;
} else if (handlerExecutor == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
// if the handler is non-forking then dispatch to GENERIC to avoid a possible stack overflow
} else if (handlerExecutor == EsExecutors.DIRECT_EXECUTOR_SERVICE && enableStackOverflowAvoidance) {
// If the handler is non-forking and stack overflow protection is enabled then dispatch to GENERIC
// Otherwise we let the handler deal with any potential stack overflow (this is the default)
return threadPool.generic();
} else {
return handlerExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,13 @@ public void testInternalSendExceptionForksToHandlerExecutor() {
}
}

public void testInternalSendExceptionForksToGenericIfHandlerDoesNotFork() {
try (var nodeA = new TestNode("node-A")) {
public void testInternalSendExceptionForksToGenericIfHandlerDoesNotForkAndStackOverflowProtectionEnabled() {
try (
var nodeA = new TestNode(
"node-A",
Settings.builder().put(TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE.getKey(), true).build()
)
) {
final var future = new PlainActionFuture<TransportResponse.Empty>();
nodeA.transportService.sendRequest(
nodeA.getThrowingConnection(),
Expand All @@ -165,6 +170,31 @@ public void testInternalSendExceptionForksToGenericIfHandlerDoesNotFork() {

assertEquals("simulated exception in sendRequest", getSendRequestException(future, IOException.class).getMessage());
}
assertWarnings(
"[transport.enable_stack_protection] setting was deprecated in Elasticsearch and will be removed in a future release."
);
}

public void testInternalSendExceptionWithNonForkingResponseHandlerCanCompleteFutureWaiterFromGenericThread() throws Exception {
try (var nodeA = new TestNode("node-A")) {
final var future = new PlainActionFuture<TransportResponse.Empty>();
final var latch = new CountDownLatch(1);
nodeA.transportService.getThreadPool().generic().execute(() -> {
assertEquals("simulated exception in sendRequest", getSendRequestException(future, IOException.class).getMessage());
latch.countDown();
});
nodeA.transportService.sendRequest(
nodeA.getThrowingConnection(),
TestNode.randomActionName(random()),
new EmptyRequest(),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(future.delegateResponse((l, e) -> {
assertThat(Thread.currentThread().getName(), startsWith("TEST-"));
l.onFailure(e);
}), unusedReader(), EsExecutors.DIRECT_EXECUTOR_SERVICE)
);
assertBusy(() -> assertTrue(future.isDone()));
}
}

public void testInternalSendExceptionForcesExecutionOnHandlerExecutor() {
Expand Down

0 comments on commit 37d520c

Please sign in to comment.