Skip to content

Commit

Permalink
The FutureBase reports failure in FutureImpl emitSuccess/emitFailure …
Browse files Browse the repository at this point in the history
…methods along with the potential context dispatch. Instead this should be performed by the listener, multiple listeners will also each report error independantly and also be applied independantly on whether it required a context execution.

fixes #4032
  • Loading branch information
vietj committed Jul 26, 2021
1 parent 79119cc commit d07329c
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
1 change: 0 additions & 1 deletion src/main/java/io/vertx/core/impl/future/Eventually.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
*/
package io.vertx.core.impl.future;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.impl.ContextInternal;

Expand Down
4 changes: 0 additions & 4 deletions src/main/java/io/vertx/core/impl/future/FutureBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ protected final void emitSuccess(T value, Listener<T> listener) {
ContextInternal prev = context.beginDispatch();
try {
listener.onSuccess(value);
} catch (Throwable t) {
context.reportException(t);
} finally {
context.endDispatch(prev);
}
Expand All @@ -69,8 +67,6 @@ protected final void emitFailure(Throwable cause, Listener<T> listener) {
ContextInternal prev = context.beginDispatch();
try {
listener.onFailure(cause);
} catch (Throwable t) {
context.reportException(t);
} finally {
context.endDispatch(prev);
}
Expand Down
40 changes: 36 additions & 4 deletions src/main/java/io/vertx/core/impl/future/FutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,15 @@ public Future<T> onSuccess(Handler<T> handler) {
addListener(new Listener<T>() {
@Override
public void onSuccess(T value) {
handler.handle(value);
try {
handler.handle(value);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
@Override
public void onFailure(Throwable failure) {
Expand All @@ -105,7 +113,15 @@ public void onSuccess(T value) {
}
@Override
public void onFailure(Throwable failure) {
handler.handle(failure);
try {
handler.handle(failure);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
});
return this;
Expand All @@ -121,11 +137,27 @@ public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
listener = new Listener<T>() {
@Override
public void onSuccess(T value) {
handler.handle(FutureImpl.this);
try {
handler.handle(FutureImpl.this);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
@Override
public void onFailure(Throwable failure) {
handler.handle(FutureImpl.this);
try {
handler.handle(FutureImpl.this);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
};
}
Expand Down
71 changes: 71 additions & 0 deletions src/test/java/io/vertx/core/FutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@

import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.future.PromiseInternal;
import org.junit.Test;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -25,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -1535,4 +1539,71 @@ public void testCompletedFuturesContext() throws Exception {

await();
}

private final RuntimeException failure = new RuntimeException();

@Test
public void testOnCompleteReportFailureOnContext1() {
testListenersReportFailureOnContext((ctx, task) -> ctx.runOnContext(v -> task.run()), (fut, task) -> fut.onComplete(ignore -> task.run()), Promise::complete);
}

@Test
public void testOnCompleteReportFailureOnContext2() {
testListenersReportFailureOnContext((ctx, task) -> new Thread(task).start(), (fut, task) -> fut.onComplete(ignore -> task.run()), Promise::complete);
}

@Test
public void testOnCompleteReportFailureOnContext3() {
testListenersReportFailureOnContext((ctx, task) -> new Thread(task).start(), (fut, task) -> fut.onComplete(ignore -> task.run()), promise -> promise.fail("failure"));
}

@Test
public void testOnCompleteReportFailureOnContext4() {
testListenersReportFailureOnContext((ctx, task) -> new Thread(task).start(), (fut, task) -> fut.onComplete(ignore -> task.run()), promise -> promise.fail("failure"));
}

@Test
public void testOnSuccessReportFailureOnContext1() {
testListenersReportFailureOnContext((ctx, task) -> ctx.runOnContext(v -> task.run()), (fut, task) -> fut.onSuccess(ignore -> task.run()), Promise::complete);
}

@Test
public void testOnSuccessReportFailureOnContext2() {
testListenersReportFailureOnContext((ctx, task) -> new Thread(task).start(), (fut, task) -> fut.onSuccess(ignore -> task.run()), Promise::complete);
}

@Test
public void testOnFailureReportFailureOnContext1() {
testListenersReportFailureOnContext((ctx, task) -> ctx.runOnContext(v -> task.run()), (fut, task) -> fut.onFailure(ignore -> task.run()), promise -> promise.fail("failure"));
}

@Test
public void testOnFailureReportFailureOnContext2() {
testListenersReportFailureOnContext((ctx, task) -> new Thread(task).start(), (fut, task) -> fut.onFailure(ignore -> task.run()), promise -> promise.fail("failure"));
}

private <T> void testListenersReportFailureOnContext(BiConsumer<ContextInternal, Runnable> runner, BiConsumer<Future<String>, Runnable> subscriber, Consumer<Promise<?>> completer) {
testListenersReportFailureOnContext(runner, subscriber, completer, 1);
testListenersReportFailureOnContext(runner, subscriber, completer, 2);
}

private <T> void testListenersReportFailureOnContext(BiConsumer<ContextInternal, Runnable> runner, BiConsumer<Future<String>, Runnable> subscriber, Consumer<Promise<?>> completer, int size) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
List<Throwable> caught = Collections.synchronizedList(new ArrayList<>());
ctx.exceptionHandler(caught::add);
runner.accept(ctx, () -> {
PromiseInternal<String> promise = ctx.promise();
for (int i = 0;i < size;i++) {
subscriber.accept(promise.future(), () -> {
throw failure;
});
}
try {
completer.accept(promise);
} catch (Exception e) {
fail("Was not expecting exception to bubble up");
}
});
waitUntil(() -> caught.size() == size && caught.get(0) == failure);
}
}

0 comments on commit d07329c

Please sign in to comment.