Skip to content

Commit

Permalink
Merge pull request #489 from smallrye/add-multi-createFrom-resourceFr…
Browse files Browse the repository at this point in the history
…omUni

Add Uni.createFrom().resourceFromUni
  • Loading branch information
jponge authored Feb 25, 2021
2 parents 1007d03 + e2db447 commit 35a24f4
Show file tree
Hide file tree
Showing 3 changed files with 964 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,40 @@ public <R, I> MultiResource<R, I> resource(Supplier<? extends R> resourceSupplie
return new MultiResource<>(actual, actualStreamSupplier);
}

/**
* Creates a {@link Multi} from a <em>resource</em>, generated by a supplier function called for each individual
* {@link Subscriber}, while streaming the items from a {@link Publisher Publisher/Multi} created from the resource.
* <p>
* Unlike {@link #resource(Supplier, Function)}, the {@code Supplier} produces a {@link Uni}. So, the actual
* <em>resource</em> can be resolved asynchronously.
* </p>
* This method gets a <em>resource</em> and creates a {@link Publisher} from this resource (by calling the
* {@code streamSupplier} function once the {@code Uni} emits the resource instance). The subscriber receives the
* items from this {@link Publisher}. When the stream completes, fails or when the subscriber cancels the
* subscription, a finalizer is called to <em>close</em> the resource. This cleanup process can be either
* synchronous and asynchronous, as well as distinct for each type of event.
*
* If the Uni produced by the {@code resourceSupplier} emits a failure, the failure is propagated downstream.
* If the Uni produced by the {@code resourceSupplier} does not emit an item before downstream cancellation, the
* resource creation is cancelled.
*
* This method can be seen as a reactive version of the "try/finally" construct.
*
* @param resourceSupplier a supplier called for each subscriber to generate the resource, must not be {@code null}.
* The supplier produces a {@link Uni} emitting the resource.
* @param streamSupplier a function returning the stream for the given resource instance, must not be {@code null}.
* @param <R> the type of the resource.
* @param <I> the type of items emitted by the stream produced by the {@code streamSupplier}.
* @return an object to configure the <em>finalizers</em>.
*/
public <R, I> MultiResourceUni<R, I> resourceFromUni(Supplier<Uni<R>> resourceSupplier,
Function<? super R, ? extends Publisher<I>> streamSupplier) {
Supplier<Uni<R>> actual = Infrastructure.decorate(nonNull(resourceSupplier, "resourceSupplier"));
Function<? super R, ? extends Publisher<I>> actualStreamSupplier = Infrastructure
.decorate(nonNull(streamSupplier, "streamSupplier"));
return new MultiResourceUni<>(actual, actualStreamSupplier);
}

/**
* Creates a {@link Multi} from on some initial state and a generator function.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.smallrye.mutiny.groups;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;

/**
* Allows configuring a <em>finalizer</em> to close the resource attached to the stream.
* Unlike {@link MultiResource}, this version receives a {@code () -> Uni<R>}, meaning that the resource instance can be
* resolved asynchronously.
*
* @param <R> the type of resource
* @param <I> the type of item emitted by the resulting {@code Multi}
* @see MultiCreate#resourceFromUni(Supplier, Function)
*/
public class MultiResourceUni<R, I> {
private final Function<? super R, ? extends Publisher<I>> streamSupplier;
private final Supplier<Uni<R>> resourceSupplier;

public MultiResourceUni(Supplier<Uni<R>> resourceSupplier,
Function<? super R, ? extends Publisher<I>> streamSupplier) {
this.resourceSupplier = resourceSupplier;
this.streamSupplier = streamSupplier;
}

/**
* Configures a <em>synchronous</em> finalizer. The given function is called when the stream completes, fails or
* when the subscriber cancels.
* If the finalizer throws an exception, this exception is propagated to the subscriber, unless it has already
* cancelled.
*
* @param finalizer the finalizer, must not be {@code null}
* @return the multi
*/
public Multi<I> withFinalizer(Consumer<? super R> finalizer) {
Consumer<? super R> callback = Infrastructure.decorate(ParameterValidation.nonNull(finalizer, "finalizer"));
Function<? super R, Uni<Void>> actual = r -> {
callback.accept(r);
return Uni.createFrom().voidItem();
};
return withFinalizer(actual, (r, ignored) -> actual.apply(r), actual);
}

/**
* Configures an <em>asynchronous</em> finalizer. The given function is called when the stream completes, fails or
* when the subscriber cancels. The returned {@code Uni} is flattened with the stream meaning that the subscriber
* gets the events fired by the {@code Uni}. If the {@link Uni} completes successfully, the subscriber gets
* the {@code completion} event. If the {@link Uni} fails, the subscriber gets the failure even if the resource
* stream completed successfully. If the {@link Uni} fails after a resource stream failure, the subscriber receives
* a {@link io.smallrye.mutiny.CompositeException}. If the subscribers cancels, the {@link Uni} outcome is ignored.
* <p>
* If the finalizer throws an exception, this exception is propagated to the subscriber, unless it has already
* cancelled.
* If the finalizer returns {@code null}, a {@link NullPointerException} is propagated to the subscriber, unless it
* has already cancelled.
*
* @param finalizer the finalizer, must not be {@code null}
* @return the multi
*/
public Multi<I> withFinalizer(Function<? super R, Uni<Void>> finalizer) {
Function<? super R, Uni<Void>> actual = Infrastructure
.decorate(ParameterValidation.nonNull(finalizer, "finalizer"));
return withFinalizer(actual, (r, ignored) -> actual.apply(r), actual);
}

/**
* Configures <em>asynchronous</em> finalizers distinct for each event. The given functions are called when the
* stream completes, fails or when the subscriber cancels.
* <p>
* The returned {@code Uni} is flattened with the stream meaning that the subscriber
* gets the events fired by the {@code Uni}. If the {@link Uni} completes successfully, the subscriber gets
* the {@code completion} event. If the {@link Uni} fails, the subscriber gets the failure even if the resource
* stream completed successfully. If the {@link Uni} fails after a resource stream failure, the subscriber receives
* a {@link io.smallrye.mutiny.CompositeException}. If the subscribers cancels, the {@link Uni} outcome is ignored.
* <p>
* If a finalizer throws an exception, this exception is propagated to the subscriber, unless it has already
* cancelled.
* If a finalizer returns {@code null}, a {@link NullPointerException} is propagated to the subscriber, unless it
* has already cancelled.
*
* @param onCompletion the completion finalizer called when the resource stream completes successfully. Must not be
* {@code null}
* @param onFailure the failure finalizer called when the resource stream propagated a failure. The finalizer is
* called with the resource and the failure. Must not be {@code null}
* @param onCancellation the cancellation finalizer called when the subscribers cancels the subscription. Must not
* be {@code null}.
* @return the multi
*/
public Multi<I> withFinalizer(
Function<? super R, Uni<Void>> onCompletion,
BiFunction<? super R, ? super Throwable, Uni<Void>> onFailure,
Function<? super R, Uni<Void>> onCancellation) {

return Uni.createFrom().deferred(() -> {
Uni<R> uni;
try {
uni = resourceSupplier.get();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(e);
}

if (uni == null) {
throw new IllegalArgumentException(ParameterValidation.SUPPLIER_PRODUCED_NULL);
}
return uni;
}).onItem().transformToMulti(res -> Multi.createFrom().resource(() -> res, streamSupplier)
.withFinalizer(onCompletion, onFailure, onCancellation));
}
}
Loading

0 comments on commit 35a24f4

Please sign in to comment.