Skip to content

Commit

Permalink
Merge pull request #534 from smallrye/fix/catch-throwable
Browse files Browse the repository at this point in the history
Catch Throwable rather than RuntimeException
  • Loading branch information
jponge authored Apr 28, 2021
2 parents 808515d + a83e189 commit 380138f
Show file tree
Hide file tree
Showing 16 changed files with 51 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ public <T> Multi<T> item(Supplier<? extends T> supplier) {
T item;
try {
item = actual.get();
} catch (RuntimeException e) {
} catch (Throwable err) {
// Exception from the supplier, propagate it.
emitter.fail(e);
emitter.fail(err);
return;
}
if (item != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ private Function<Multi<Throwable>, Publisher<Long>> addPredicateToBackoffFactory
} else {
emitter.fail(throwable);
}
} catch (RuntimeException e) {
emitter.fail(e);
} catch (Throwable err) {
emitter.fail(err);
}
})).concatenate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private Function<Multi<Throwable>, Publisher<Long>> addPredicateToBackoffFactory
} else {
emitter.fail(throwable);
}
} catch (RuntimeException e) {
emitter.fail(e);
} catch (Throwable err) {
emitter.fail(err);
}
})).concatenate());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void onItem(T item) {
try {
Runnable dispatch = () -> downstream.onItem(item);
scheduledFuture = executor.schedule(dispatch, duration.toMillis(), TimeUnit.MILLISECONDS);
} catch (RuntimeException err) {
} catch (Throwable err) {
downstream.onFailure(err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public void onItem(T item) {
return;
}
uni.runSubscriptionOn(executor).subscribe().with(ignored -> super.onItem(item), super::onFailure);
} catch (RuntimeException e) {
super.onFailure(e);
} catch (Throwable err) {
super.onFailure(err);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public void onFailure(Throwable failure) {
boolean test;
try {
test = predicate.test(failure);
} catch (RuntimeException e) {
downstream.onFailure(new CompositeException(failure, e));
} catch (Throwable err) {
downstream.onFailure(new CompositeException(failure, err));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
try {
state = holder.get();
// get() throws an NPE is the produced state is null.
} catch (Exception e) {
} catch (Throwable err) {
subscriber.onSubscribe(EmptyUniSubscription.DONE);
subscriber.onFailure(e);
subscriber.onFailure(err);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
try {
state = holder.get();
// get() throws an NPE is the produced state is null.
} catch (Exception e) {
} catch (Throwable err) {
subscriber.onSubscribe(DONE);
subscriber.onFailure(e);
subscriber.onFailure(err);
return;
}

Uni<? extends T> uni;
try {
uni = mapper.apply(state);
} catch (Throwable e) {
} catch (Throwable err) {
subscriber.onSubscribe(DONE);
subscriber.onFailure(e);
subscriber.onFailure(err);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
try {
state = holder.get();
// get() throws an NPE is the produced state is null.
} catch (Exception e) {
} catch (Throwable err) {
subscriber.onSubscribe(EmptyUniSubscription.DONE);
subscriber.onFailure(e);
subscriber.onFailure(err);
return;
}

DefaultUniEmitter<? super T> emitter = new DefaultUniEmitter<>(subscriber);
subscriber.onSubscribe(emitter);
try {
consumer.accept(state, emitter);
} catch (RuntimeException e) {
} catch (Throwable err) {
// we use the emitter to be sure that if the failure happens after the first event being fired, it
// will be dropped.
emitter.fail(e);
emitter.fail(err);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
} else {
subscriber.onFailure(failure);
}
} catch (RuntimeException err) {
} catch (Throwable err) {
subscriber.onFailure(err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void dispatchImmediateResult(Future<? extends T> future, UniSubscriber<?
downstream.onSubscribe(DONE);
downstream.onFailure(e.getCause());
return;
} catch (Exception err) {
} catch (Throwable err) {
if (err instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
try {
T item = supplier.get();
subscriber.onItem(item);
} catch (RuntimeException err) {
} catch (Throwable err) {
subscriber.onFailure(err);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public void subscribe(UniSubscriber<? super T> subscriber) {
try {
state = holder.get();
// get() throws an NPE is the produced state is null.
} catch (Exception e) {
subscriber.onFailure(e);
} catch (Throwable err) {
subscriber.onFailure(err);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public void subscribe(UniSubscriber<? super T> subscriber) {

try {
consumer.accept(emitter);
} catch (RuntimeException e) {
} catch (Throwable err) {
// we use the emitter to be sure that if the failure happens after the first event being fired, it
// will be dropped.
emitter.fail(e);
emitter.fail(err);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.smallrye.mutiny.helpers;

/**
* The infamous sneaky throw helper.
*/
public interface SneakyThrow {

@SuppressWarnings("unchecked")
static <E extends Throwable> RuntimeException sneakyThrow(Throwable e) throws E {
throw (E) e;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.smallrye.mutiny.operators;

import static io.smallrye.mutiny.helpers.SneakyThrow.sneakyThrow;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -79,6 +80,16 @@ public void testCreationWithExceptionThrownBySupplier() {
.assertFailedWith(IllegalStateException.class, "boom");
}

@Test
public void testCreationWithCheckedExceptionThrownBySupplier() {
Multi<Integer> multi = Multi.createFrom().item(() -> {
throw sneakyThrow(new Exception("boom"));
});
multi.subscribe().withSubscriber(AssertSubscriber.create())
.assertHasNotReceivedAnyItem()
.assertFailedWith(Exception.class, "boom");
}

@Test
public void testCreationFromAStreamWithRequest() {
Multi<Integer> multi = Multi.createFrom().items(Stream.of(1, 2, 3));
Expand Down

0 comments on commit 380138f

Please sign in to comment.