Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve FutureUtils.get exception handling (#50339) #50417

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;

import java.util.concurrent.TimeUnit;

public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements ActionFuture<T>, ActionListener<L> {

@Override
public T actionGet() {
return FutureUtils.get(this);
try {
return FutureUtils.get(this);
} catch (ElasticsearchException e) {
throw unwrapEsException(e);
}
}

@Override
Expand All @@ -51,7 +57,11 @@ public T actionGet(TimeValue timeout) {

@Override
public T actionGet(long timeout, TimeUnit unit) {
return FutureUtils.get(this, timeout, unit);
try {
return FutureUtils.get(this, timeout, unit);
} catch (ElasticsearchException e) {
throw unwrapEsException(e);
}
}

@Override
Expand All @@ -66,4 +76,11 @@ public void onFailure(Exception e) {

protected abstract T convert(L listenerResponse);

private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Throwable root = esEx.unwrapCause();
if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -89,7 +89,16 @@ public void onFailure(Exception e) {
});
}

// todo: this explicit unwrap should not be necessary, but is until guessRootCause is fixed to allow wrapped non-es exception.
private static Exception unwrapException(Exception cause) {
return cause instanceof ElasticsearchException ? FutureUtils.unwrapEsException((ElasticsearchException) cause) : cause;
return cause instanceof ElasticsearchException ? unwrapEsException((ElasticsearchException) cause) : cause;
}

private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Throwable root = esEx.unwrapCause();
if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
Expand Down Expand Up @@ -86,21 +85,10 @@ public static <T> T get(Future<T> future, long timeout, TimeUnit unit) {
}

public static RuntimeException rethrowExecutionException(ExecutionException e) {
if (e.getCause() instanceof ElasticsearchException) {
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
return unwrapEsException(esEx);
} else if (e.getCause() instanceof RuntimeException) {
if (e.getCause() instanceof RuntimeException) {
return (RuntimeException) e.getCause();
} else {
return new UncategorizedExecutionException("Failed execution", e);
}
}

public static RuntimeException unwrapEsException(ElasticsearchException esEx) {
Throwable root = esEx.unwrapCause();
if (root instanceof ElasticsearchException || root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.junit.After;
import org.junit.Before;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -110,4 +112,20 @@ private void executeAction(Runnable runnable) {
runnable.run();
}
}

/**
* This test checks that we no longer unwrap exceptions when using StepListener.
*/
public void testNoUnwrap() {
StepListener<String> step = new StepListener<>();
step.onFailure(new RemoteTransportException("test", new RuntimeException("expected")));
AtomicReference<RuntimeException> exception = new AtomicReference<>();
step.whenComplete(null, e -> {
exception.set((RuntimeException) e);
});

assertEquals(RemoteTransportException.class, exception.get().getClass());
RuntimeException e = expectThrows(RuntimeException.class, () -> step.result());
assertEquals(RemoteTransportException.class, e.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.elasticsearch.action.support;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;

import java.util.Objects;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -90,4 +94,28 @@ protected String convert(final Integer listenerResponse) {
thread.join();
}

public void testUnwrapException() {
checkUnwrap(new RemoteTransportException("test", new RuntimeException()), RuntimeException.class, RemoteTransportException.class);
checkUnwrap(new RemoteTransportException("test", new Exception()),
UncategorizedExecutionException.class, RemoteTransportException.class);
checkUnwrap(new Exception(), UncategorizedExecutionException.class, Exception.class);
checkUnwrap(new ElasticsearchException("test", new Exception()), ElasticsearchException.class, ElasticsearchException.class);
}

private void checkUnwrap(Exception exception, Class<? extends Exception> actionGetException,
Class<? extends Exception> getException) {
final AdapterActionFuture<Void, Void> adapter = new AdapterActionFuture<Void, Void>() {
@Override
protected Void convert(Void listenerResponse) {
fail();
return null;
}
};

adapter.onFailure(exception);
assertEquals(actionGetException, expectThrows(RuntimeException.class, adapter::actionGet).getClass());
assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> adapter.actionGet(10, TimeUnit.SECONDS)).getClass());
assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get()).getCause().getClass());
assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get(10, TimeUnit.SECONDS)).getCause().getClass());
}
}