diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreate.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreate.java index 9fcc605b4..7935c3702 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreate.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreate.java @@ -556,6 +556,40 @@ public MultiResource resource(Supplier resourceSupplie return new MultiResource<>(actual, actualStreamSupplier); } + /** + * Creates a {@link Multi} from a resource, generated by a supplier function called for each individual + * {@link Subscriber}, while streaming the items from a {@link Publisher Publisher/Multi} created from the resource. + *

+ * Unlike {@link #resource(Supplier, Function)}, the {@code Supplier} produces a {@link Uni}. So, the actual + * resource can be resolved asynchronously. + *

+ * This method gets a resource and creates a {@link Publisher} from this resource (by calling the + * {@code streamSupplier} function once the {@code Uni} emits the resource instance). The subscriber receives the + * items from this {@link Publisher}. When the stream completes, fails or when the subscriber cancels the + * subscription, a finalizer is called to close the resource. This cleanup process can be either + * synchronous and asynchronous, as well as distinct for each type of event. + * + * If the Uni produced by the {@code resourceSupplier} emits a failure, the failure is propagated downstream. + * If the Uni produced by the {@code resourceSupplier} does not emit an item before downstream cancellation, the + * resource creation is cancelled. + * + * This method can be seen as a reactive version of the "try/finally" construct. + * + * @param resourceSupplier a supplier called for each subscriber to generate the resource, must not be {@code null}. + * The supplier produces a {@link Uni} emitting the resource. + * @param streamSupplier a function returning the stream for the given resource instance, must not be {@code null}. + * @param the type of the resource. + * @param the type of items emitted by the stream produced by the {@code streamSupplier}. + * @return an object to configure the finalizers. + */ + public MultiResourceUni resourceFromUni(Supplier> resourceSupplier, + Function> streamSupplier) { + Supplier> actual = Infrastructure.decorate(nonNull(resourceSupplier, "resourceSupplier")); + Function> actualStreamSupplier = Infrastructure + .decorate(nonNull(streamSupplier, "streamSupplier")); + return new MultiResourceUni<>(actual, actualStreamSupplier); + } + /** * Creates a {@link Multi} from on some initial state and a generator function. *

diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiResourceUni.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiResourceUni.java new file mode 100644 index 000000000..8c9caaf17 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiResourceUni.java @@ -0,0 +1,119 @@ +package io.smallrye.mutiny.groups; + +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +/** + * Allows configuring a finalizer to close the resource attached to the stream. + * Unlike {@link MultiResource}, this version receives a {@code () -> Uni}, meaning that the resource instance can be + * resolved asynchronously. + * + * @param the type of resource + * @param the type of item emitted by the resulting {@code Multi} + * @see MultiCreate#resourceFromUni(Supplier, Function) + */ +public class MultiResourceUni { + private final Function> streamSupplier; + private final Supplier> resourceSupplier; + + public MultiResourceUni(Supplier> resourceSupplier, + Function> streamSupplier) { + this.resourceSupplier = resourceSupplier; + this.streamSupplier = streamSupplier; + } + + /** + * Configures a synchronous finalizer. The given function is called when the stream completes, fails or + * when the subscriber cancels. + * If the finalizer throws an exception, this exception is propagated to the subscriber, unless it has already + * cancelled. + * + * @param finalizer the finalizer, must not be {@code null} + * @return the multi + */ + public Multi withFinalizer(Consumer finalizer) { + Consumer callback = Infrastructure.decorate(ParameterValidation.nonNull(finalizer, "finalizer")); + Function> actual = r -> { + callback.accept(r); + return Uni.createFrom().voidItem(); + }; + return withFinalizer(actual, (r, ignored) -> actual.apply(r), actual); + } + + /** + * Configures an asynchronous finalizer. The given function is called when the stream completes, fails or + * when the subscriber cancels. The returned {@code Uni} is flattened with the stream meaning that the subscriber + * gets the events fired by the {@code Uni}. If the {@link Uni} completes successfully, the subscriber gets + * the {@code completion} event. If the {@link Uni} fails, the subscriber gets the failure even if the resource + * stream completed successfully. If the {@link Uni} fails after a resource stream failure, the subscriber receives + * a {@link io.smallrye.mutiny.CompositeException}. If the subscribers cancels, the {@link Uni} outcome is ignored. + *

+ * If the finalizer throws an exception, this exception is propagated to the subscriber, unless it has already + * cancelled. + * If the finalizer returns {@code null}, a {@link NullPointerException} is propagated to the subscriber, unless it + * has already cancelled. + * + * @param finalizer the finalizer, must not be {@code null} + * @return the multi + */ + public Multi withFinalizer(Function> finalizer) { + Function> actual = Infrastructure + .decorate(ParameterValidation.nonNull(finalizer, "finalizer")); + return withFinalizer(actual, (r, ignored) -> actual.apply(r), actual); + } + + /** + * Configures asynchronous finalizers distinct for each event. The given functions are called when the + * stream completes, fails or when the subscriber cancels. + *

+ * The returned {@code Uni} is flattened with the stream meaning that the subscriber + * gets the events fired by the {@code Uni}. If the {@link Uni} completes successfully, the subscriber gets + * the {@code completion} event. If the {@link Uni} fails, the subscriber gets the failure even if the resource + * stream completed successfully. If the {@link Uni} fails after a resource stream failure, the subscriber receives + * a {@link io.smallrye.mutiny.CompositeException}. If the subscribers cancels, the {@link Uni} outcome is ignored. + *

+ * If a finalizer throws an exception, this exception is propagated to the subscriber, unless it has already + * cancelled. + * If a finalizer returns {@code null}, a {@link NullPointerException} is propagated to the subscriber, unless it + * has already cancelled. + * + * @param onCompletion the completion finalizer called when the resource stream completes successfully. Must not be + * {@code null} + * @param onFailure the failure finalizer called when the resource stream propagated a failure. The finalizer is + * called with the resource and the failure. Must not be {@code null} + * @param onCancellation the cancellation finalizer called when the subscribers cancels the subscription. Must not + * be {@code null}. + * @return the multi + */ + public Multi withFinalizer( + Function> onCompletion, + BiFunction> onFailure, + Function> onCancellation) { + + return Uni.createFrom().deferred(() -> { + Uni uni; + try { + uni = resourceSupplier.get(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + + if (uni == null) { + throw new IllegalArgumentException(ParameterValidation.SUPPLIER_PRODUCED_NULL); + } + return uni; + }).onItem().transformToMulti(res -> Multi.createFrom().resource(() -> res, streamSupplier) + .withFinalizer(onCompletion, onFailure, onCancellation)); + } +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java new file mode 100644 index 000000000..e54803c50 --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java @@ -0,0 +1,811 @@ +package io.smallrye.mutiny.operators.multi.builders; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; + +import io.smallrye.mutiny.CompositeException; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.spies.Spy; +import io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; + +@SuppressWarnings("ConstantConditions") +public class MultiFromResourceFromUniTest { + + @Test + public void testWithResourceSupplierThrowingException() { + Supplier> supplier = () -> { + throw new IllegalArgumentException("boom"); + }; + Multi multi = Multi.createFrom().resourceFromUni(supplier, + s -> Multi.createFrom().item(s)) + .withFinalizer(r -> { + }); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10)); + subscriber + .assertFailedWith(IllegalArgumentException.class, "boom") + .assertHasNotReceivedAnyItem(); + } + + @Test + public void testWithResourceSupplierEmittingFailure() { + Supplier> supplier = () -> Uni.createFrom().failure(new IllegalArgumentException("boom")); + Multi multi = Multi.createFrom().resourceFromUni(supplier, + s -> Multi.createFrom().item(s)) + .withFinalizer(r -> { + }); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10)); + subscriber + .assertFailedWith(IllegalArgumentException.class, "boom") + .assertHasNotReceivedAnyItem(); + } + + @Test + public void testWithResourceSupplierProducingNull() { + Supplier> supplier = () -> null; + Multi multi = Multi.createFrom().resourceFromUni(supplier, + s -> Multi.createFrom().items(s)) + .withFinalizer(r -> { + }); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10)); + subscriber + .assertFailedWith(IllegalArgumentException.class, "") + .assertHasNotReceivedAnyItem(); + } + + @Test + public void testWithNullAsResourceSupplier() { + assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().resourceFromUni(null, + s -> Multi.createFrom().items(s)) + .withFinalizer(r -> { + })); + } + + @Test + public void testWithStreamSupplierThrowingException() { + Supplier> supplier = () -> Uni.createFrom().item("Hello"); + Function> stream = s -> { + throw new IllegalArgumentException("boom"); + }; + Multi multi = Multi.createFrom().resourceFromUni(supplier, stream) + .withFinalizer(r -> { + }); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10)); + subscriber + .assertFailedWith(IllegalArgumentException.class, "boom") + .assertHasNotReceivedAnyItem(); + } + + @Test + public void testWithStreamSupplierProducingNull() { + Supplier> supplier = () -> null; + Multi multi = Multi.createFrom().resourceFromUni(supplier, + s -> (Publisher) null) + .withFinalizer(r -> { + }); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create(10)); + subscriber + .assertFailedWith(IllegalArgumentException.class, "") + .assertHasNotReceivedAnyItem(); + } + + @Test + public void testWithNullAsStreamSupplier() { + assertThrows(IllegalArgumentException.class, () -> Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer(r -> { + })); + } + + @Test + public void testWithNullAsFinalizer() { + assertThrows(IllegalArgumentException.class, + () -> Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer((Consumer) null)); + } + + @Test + public void testWithNullAsFinalizer2() { + assertThrows(IllegalArgumentException.class, + () -> Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer((Function>) null)); + } + + @Test + public void testWithNullAsFinalizer3() { + assertThrows(IllegalArgumentException.class, () -> { + Function> function = s -> Uni.createFrom().item(() -> null); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer(function, null, function); + }); + } + + @Test + public void testWithNullAsFinalizer4() { + assertThrows(IllegalArgumentException.class, () -> { + Function> function = s -> Uni.createFrom().item(() -> null); + BiFunction> onFailure = (s, f) -> Uni.createFrom().item(() -> null); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer(null, onFailure, function); + }); + } + + @Test + public void testWithNullAsFinalizer5() { + assertThrows(IllegalArgumentException.class, () -> { + Function> function = s -> Uni.createFrom().item(() -> null); + BiFunction> onFailure = (s, f) -> Uni.createFrom().item(() -> null); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item("hello"), null) + .withFinalizer(function, onFailure, null); + }); + } + + @Test + public void simpleSynchronousTest() { + AssertSubscriber subscriber = AssertSubscriber.create(10); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item(1), r -> Multi.createFrom().range(r, 11)) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .assertCompleted(); + assertThat(cleanup.get()).isEqualTo(1); + } + + @Test + public void simpleSynchronousTestWithMultipleSubscribers() { + AssertSubscriber subscriber1 = AssertSubscriber.create(10); + AssertSubscriber subscriber2 = AssertSubscriber.create(10); + List list = new ArrayList<>(); + AtomicInteger count = new AtomicInteger(); + Multi multi = Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item(count::incrementAndGet), + r -> Multi.createFrom().range(r, 11)) + .withFinalizer((Consumer) list::add); + multi.subscribe(subscriber1); + multi.subscribe(subscriber2); + + subscriber1 + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .assertCompleted(); + subscriber2 + .assertItems(2, 3, 4, 5, 6, 7, 8, 9, 10) + .assertCompleted(); + assertThat(list).containsExactly(1, 2); + } + + @Test + public void testCleanupCalledOnCompletionWithSynchronousFinalizer() { + AssertSubscriber subscriber = AssertSubscriber.create(9); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item(1), r -> Multi.createFrom().range(r, 11)) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9) + .run(() -> assertThat(cleanup).hasValue(0)) + .request(1) + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .run(() -> assertThat(cleanup).hasValue(1)) + .assertCompleted(); + } + + @Test + public void testCleanupCalledOnCancellationWithSynchronousFinalizer() { + AssertSubscriber subscriber = AssertSubscriber.create(4); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item(1), r -> Multi.createFrom().range(r, 11)) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertItems(1, 2, 3, 4) + .run(() -> assertThat(cleanup).hasValue(0)) + .cancel() + .assertItems(1, 2, 3, 4) + .run(() -> assertThat(cleanup).hasValue(1)) + .assertNotTerminated(); + } + + @Test + public void testCleanupCalledOnFailureWithSynchronousFinalizer() { + AssertSubscriber subscriber = AssertSubscriber.create(1); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item(1), r -> Multi.createFrom(). emitter( + e -> e.emit(1).emit(2).fail(new IOException("boom")))) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertItems(1) + .run(() -> assertThat(cleanup).hasValue(0)) + .request(3) + .assertItems(1, 2) + .run(() -> assertThat(cleanup).hasValue(1)) + .assertFailedWith(IOException.class, "boom"); + } + + @Test + public void testThatFinalizerIsNotCalledWhenResourceSupplierThrowsAnException() { + AssertSubscriber subscriber = AssertSubscriber.create(1); + Supplier> supplier = () -> { + throw new IllegalArgumentException("boom"); + }; + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom() + .resourceFromUni(supplier, + r -> Multi.createFrom(). emitter(e -> e.emit(1).emit(2).fail(new IOException("boom")))) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertFailedWith(IllegalArgumentException.class, "boom"); + assertThat(cleanup).hasValue(0); + } + + @Test + public void testThatFinalizerIsCalledWhenStreamSupplierThrowsAnException() { + AssertSubscriber subscriber = AssertSubscriber.create(1); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom(). resourceFromUni(() -> Uni.createFrom().item(1), s -> { + throw new IllegalArgumentException("boom"); + }) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertFailedWith(IllegalArgumentException.class, "boom"); + assertThat(cleanup).hasValue(1); + } + + @Test + public void testThatFinalizerIsCalledWhenStreamSupplierReturnsNull() { + AssertSubscriber subscriber = AssertSubscriber.create(1); + AtomicInteger cleanup = new AtomicInteger(); + Multi.createFrom(). resourceFromUni(() -> Uni.createFrom().item(1), s -> null) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertFailedWith(IllegalArgumentException.class, "`null`"); + assertThat(cleanup).hasValue(1); + } + + @Test + public void testThatFinalizerThrowingException() { + AssertSubscriber subscriber = AssertSubscriber.create(20); + Consumer fin = s -> { + throw new IllegalStateException("boom"); + }; + Multi.createFrom().resourceFromUni(() -> Uni.createFrom().item(1), s -> Multi.createFrom().range(s, 11)) + .withFinalizer(fin) + .subscribe(subscriber); + subscriber + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .assertFailedWith(IllegalStateException.class, "boom"); + } + + @Test + public void testThatFinalizerThrowingExceptionAfterStreamFailure() { + AssertSubscriber subscriber = AssertSubscriber.create(20); + Consumer fin = s -> { + throw new IllegalStateException("boom"); + }; + Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item(1), + r -> Multi.createFrom(). emitter(e -> e.emit(1).emit(2).fail(new IOException("no!")))) + .withFinalizer(fin) + .subscribe(subscriber); + subscriber + .assertItems(1, 2) + .assertFailedWith(CompositeException.class, "boom") + .assertFailedWith(CompositeException.class, "no!"); + + } + + @Test + public void testThatOnFailureFinalizerIsNotCallIfResourceSupplierThrowsAnException() { + AssertSubscriber subscriber = AssertSubscriber.create(20); + Supplier> supplier = () -> { + throw new NullPointerException("boom"); + }; + AtomicInteger onFailure = new AtomicInteger(); + AtomicInteger onComplete = new AtomicInteger(); + AtomicInteger onCancellation = new AtomicInteger(); + + BiFunction> onFailureCallback = (s, f) -> { + onFailure.set(s); + return Uni.createFrom().voidItem(); + }; + + Function> onCompletionCallback = s -> { + onComplete.set(s); + return Uni.createFrom().voidItem(); + }; + + Function> onCancellationCallback = s -> { + onCancellation.set(s); + return Uni.createFrom().voidItem(); + }; + + Multi.createFrom().resourceFromUni(supplier, + r -> Multi.createFrom().range(r, 11)) + .withFinalizer(onCompletionCallback, onFailureCallback, onCancellationCallback) + .subscribe(subscriber); + + subscriber + .assertFailedWith(NullPointerException.class, "boom"); + assertThat(onFailure).hasValue(0); + assertThat(onCancellation).hasValue(0); + assertThat(onComplete).hasValue(0); + } + + @Test + public void cancellationShouldBePossible() { + AssertSubscriber subscriber = AssertSubscriber.create(20); + Supplier> supplier = () -> Uni.createFrom().item(1); + AtomicInteger onFailure = new AtomicInteger(); + AtomicInteger onComplete = new AtomicInteger(); + AtomicInteger onCancellation = new AtomicInteger(); + + BiFunction> onFailureCallback = (s, f) -> { + onFailure.set(s); + return Uni.createFrom().voidItem(); + }; + + Function> onCompletionCallback = s -> { + onComplete.set(s); + return Uni.createFrom().voidItem(); + }; + + Function> onCancellationCallback = s -> { + onCancellation.set(s); + return Uni.createFrom().voidItem(); + }; + + Multi.createFrom(). resourceFromUni(supplier, + r -> Multi.createFrom().nothing()) + .withFinalizer(onCompletionCallback, onFailureCallback, onCancellationCallback) + .subscribe(subscriber); + + subscriber + .cancel(); + assertThat(onFailure).hasValue(0); + assertThat(onCancellation).hasValue(1); + assertThat(onComplete).hasValue(0); + } + + @Test + public void testWithFakeTransactionalResource() { + Multi multi = Multi.createFrom().resourceFromUni( + FakeTransactionalResource::create, + FakeTransactionalResource::data) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .await() + .assertItems("in transaction") + .assertCompleted(); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatStreamSupplierThrowingExceptionCallsOnFailure() { + FakeTransactionalResource resource = new FakeTransactionalResource(); + + Multi multi = Multi.createFrom(). resource(() -> resource, r -> { + throw new IllegalStateException("boom"); + }) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertFailedWith(IllegalStateException.class, "boom"); + + assertThat(resource.subscribed).isFalse(); + assertThat(resource.onCompleteSubscribed).isFalse(); + assertThat(resource.onCancelSubscribed).isFalse(); + assertThat(resource.onFailureSubscribed).isTrue(); + assertThat(resource.failure.get()).isInstanceOf(IllegalStateException.class).hasMessage("boom"); + } + + @Test + public void testThatStreamSupplierReturningNullCallsOnFailure() { + FakeTransactionalResource resource = new FakeTransactionalResource(); + + Multi multi = Multi.createFrom(). resource(() -> resource, r -> null) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertFailedWith(IllegalArgumentException.class, "`null`"); + + assertThat(resource.subscribed).isFalse(); + assertThat(resource.onCompleteSubscribed).isFalse(); + assertThat(resource.onCancelSubscribed).isFalse(); + assertThat(resource.onFailureSubscribed).isTrue(); + assertThat(resource.failure.get()).isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testThatStreamSupplierEmittingAFailureCallsOnFailure() { + FakeTransactionalResource resource = new FakeTransactionalResource(); + + Multi multi = Multi.createFrom(). resource(() -> resource, + r -> Multi.createFrom().failure(new IOException("boom"))) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertFailedWith(IOException.class, "boom"); + + assertThat(resource.subscribed).isFalse(); + assertThat(resource.onCompleteSubscribed).isFalse(); + assertThat(resource.onCancelSubscribed).isFalse(); + assertThat(resource.onFailureSubscribed).isTrue(); + assertThat(resource.failure.get()).isInstanceOf(IOException.class); + } + + @Test + public void testThatCancellationDueToPartialConsumptionCallsOnCancel() { + + Multi multi = Multi.createFrom().resourceFromUni( + FakeTransactionalResource::create, + FakeTransactionalResource::infinite) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("0", "1", "2") + .assertCompleted(); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatCancellationFailureAreNotPropagated() { + + Multi multi = Multi.createFrom() + .resourceFromUni(() -> FakeTransactionalResource.create(), FakeTransactionalResource::infinite) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + r -> r.cancel().onItem().failWith(x -> new IOException("boom"))) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("0", "1", "2") + .assertCompleted(); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatCancellationReturningNullAreNotPropagated() { + Multi multi = Multi.createFrom() + .resourceFromUni(() -> FakeTransactionalResource.create(), FakeTransactionalResource::infinite) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollback, + r -> null) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("0", "1", "2") + .assertCompleted(); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatCompletionFailureArePropagated() { + Multi multi = Multi.createFrom() + .resourceFromUni(FakeTransactionalResource::create, FakeTransactionalResource::data) + .withFinalizer(r -> r.commit().onItem().failWith(x -> new IOException("boom")), + FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("in transaction") + .assertFailedWith(IOException.class, "boom"); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatCompletionFailureArePropagated2() { + Multi multi = Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, + FakeTransactionalResource::data) + .withFinalizer(FakeTransactionalResource::commitFailure, + FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("in transaction") + .assertFailedWith(IOException.class, "commit failed"); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testWithOnCompletionReturningNull() { + Multi multi = Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, + FakeTransactionalResource::data) + .withFinalizer( + FakeTransactionalResource::commitReturningNull, + FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel) + .select().first(3); + + multi.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)) + .await() + .assertItems("in transaction") + .assertFailedWith(NullPointerException.class, "`null`"); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatOnFailureFailureArePropagated() { + Multi multi = Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, + r -> r.data().onCompletion().failWith(new IOException("boom"))) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollbackFailure, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .await() + .assertFailedWith(CompositeException.class, "boom") + .assertFailedWith(CompositeException.class, "rollback failed"); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.failure.get()).isInstanceOf(IOException.class); + } + + @Test + public void testWithOnFailureReturningNull() { + Multi multi = Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, + r -> r.data().onCompletion().failWith(new IOException("boom"))) + .withFinalizer(FakeTransactionalResource::commit, FakeTransactionalResource::rollbackReturningNull, + FakeTransactionalResource::cancel); + + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .await() + .assertFailedWith(CompositeException.class, "boom") + .assertFailedWith(CompositeException.class, "`null`"); + + assertThat(FakeTransactionalResource.last.subscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.failure.get()).isInstanceOf(IOException.class); + } + + @Test + public void testOnCompletionWithSingleFinalizer() { + AtomicBoolean subscribed = new AtomicBoolean(); + Multi multi = Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item(1), x -> Multi.createFrom().range(x, 11)) + .withFinalizer(r -> { + return Uni.createFrom().item("ok") + .onSubscribe().invoke(s -> subscribed.set(true)) + .onItem().ignore().andContinueWithNull(); + }); + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertCompleted() + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(subscribed).isTrue(); + } + + @Test + public void testOnFailureWithSingleFinalizer() { + AtomicBoolean subscribed = new AtomicBoolean(); + Multi multi = Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item(1), + x -> Multi.createFrom().range(x, 11).onCompletion().failWith(new IOException("boom"))) + .withFinalizer(r -> { + return Uni.createFrom().item("ok") + .onSubscribe().invoke(s -> subscribed.set(true)) + .onItem().ignore().andContinueWithNull(); + }); + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertFailedWith(IOException.class, "boom") + .assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + assertThat(subscribed).isTrue(); + } + + @Test + public void testOnCancellationWithSingleFinalizer() { + AtomicBoolean subscribed = new AtomicBoolean(); + Multi multi = Multi.createFrom() + .resourceFromUni(() -> Uni.createFrom().item(1), + x -> Multi.createFrom().ticks().every(Duration.ofMillis(10))) + .withFinalizer(r -> { + return Uni.createFrom().item("ok") + .onSubscribe().invoke(s -> subscribed.set(true)) + .onItem().ignore().andContinueWithNull(); + }) + .select().first(5); + multi.subscribe().withSubscriber(AssertSubscriber.create(20)) + .await() + .assertCompleted() + .assertItems(0L, 1L, 2L, 3L, 4L); + assertThat(subscribed).isTrue(); + } + + @Test + public void testThatOnCancellationIsNotCalledAfterCompletion() { + AssertSubscriber subscriber = AssertSubscriber.create(4); + Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, FakeTransactionalResource::data) + .withFinalizer( + FakeTransactionalResource::commit, + FakeTransactionalResource::rollback, + FakeTransactionalResource::cancel) + .subscribe(subscriber); + subscriber + .await() + .assertCompleted() + .cancel(); + + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isTrue(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isFalse(); + } + + @Test + public void testThatOnCancellationIsNotCalledAfterFailure() { + AssertSubscriber subscriber = AssertSubscriber.create(4); + Multi.createFrom().resourceFromUni(FakeTransactionalResource::create, + r -> r.data().onCompletion().failWith(new IOException("boom"))) + .withFinalizer( + FakeTransactionalResource::commit, + FakeTransactionalResource::rollbackDelay, + FakeTransactionalResource::cancel) + .subscribe(subscriber); + subscriber + .await() + .assertFailedWith(IOException.class, "boom") + .cancel(); + + assertThat(FakeTransactionalResource.last.onCompleteSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onCancelSubscribed).isFalse(); + assertThat(FakeTransactionalResource.last.onFailureSubscribed).isTrue(); + } + + @Test + public void testThatFinalizerIsNotCalledIfTheUniDoesNothing() { + AssertSubscriber subscriber = AssertSubscriber.create(10); + AtomicInteger cleanup = new AtomicInteger(); + UniOnCancellationSpy nothing = Spy.onCancellation(Uni.createFrom().nothing()); + Multi.createFrom().resourceFromUni(() -> nothing, r -> Multi.createFrom().range(r, 11)) + .withFinalizer(cleanup::set) + .subscribe(subscriber); + subscriber + .assertSubscribed() + .cancel(); + + subscriber.assertNotTerminated(); + assertThat(cleanup.get()).isEqualTo(0); + assertThat(nothing.invoked()).isTrue(); + } + + static class FakeTransactionalResource { + + private static final Duration DELAY = Duration.ofMillis(100); + + static volatile FakeTransactionalResource last; + + static Uni create() { + FakeTransactionalResource item = new FakeTransactionalResource(); + last = item; + return Uni.createFrom().item(item) + .onItem().delayIt().by(Duration.ofMillis(10)); + } + + AtomicBoolean subscribed = new AtomicBoolean(); + AtomicBoolean onFailureSubscribed = new AtomicBoolean(); + AtomicBoolean onCompleteSubscribed = new AtomicBoolean(); + AtomicBoolean onCancelSubscribed = new AtomicBoolean(); + + AtomicReference failure = new AtomicReference<>(); + + public Multi data() { + return Multi.createFrom().item("in transaction") + .onSubscribe().invoke(s -> subscribed.set(true)); + } + + public Multi infinite() { + return Multi.createFrom().ticks().every(Duration.ofMillis(10)) + .onItem().transform(l -> Long.toString(l)) + .onSubscribe().invoke(s -> subscribed.set(true)); + } + + public Uni commit() { + return Uni.createFrom().voidItem() + .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + } + + public Uni commitFailure() { + return Uni.createFrom().voidItem() + .onItem().delayIt().by(DELAY) + .onItem().failWith(x -> new IOException("commit failed")) + .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + } + + public Uni commitReturningNull() { + return null; + } + + public Uni rollback(Throwable failure) { + return Uni.createFrom().voidItem() + .onItem().invoke(x -> this.failure.set(failure)) + .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + } + + public Uni rollbackDelay(Throwable failure) { + return Uni.createFrom().voidItem() + .onItem().invoke(x -> this.failure.set(failure)) + .onItem().delayIt().by(DELAY) + .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + } + + public Uni rollbackFailure(Throwable failure) { + return Uni.createFrom().voidItem() + .onItem().invoke(x -> this.failure.set(failure)) + .onItem().delayIt().by(DELAY) + .onItem().failWith(x -> new IOException("rollback failed")) + .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + } + + public Uni rollbackReturningNull(Throwable f) { + failure.set(f); + return null; + } + + public Uni cancel() { + return Uni.createFrom().voidItem() + .onSubscribe().invoke(s -> onCancelSubscribed.set(true)); + } + } +}