From ac80ee9bfb87a258fec23ba5de91868bd3bc31d9 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 7 Dec 2020 21:38:47 +0100 Subject: [PATCH 1/4] Refactor the Uni codebase to generalise on UniSubscriber instead of UniSerializedSubscriber --- .../mutiny/helpers/spies/UniGlobalSpy.java | 4 +-- .../helpers/spies/UniOnCancellationSpy.java | 4 +-- .../mutiny/helpers/spies/UniOnFailureSpy.java | 4 +-- .../helpers/spies/UniOnItemOrFailureSpy.java | 4 +-- .../mutiny/helpers/spies/UniOnItemSpy.java | 4 +-- .../helpers/spies/UniOnSubscribeSpy.java | 4 +-- .../helpers/spies/UniOnTerminationSpy.java | 4 +-- .../mutiny/operators/AbstractUni.java | 2 +- .../mutiny/operators/UniAndCombination.java | 6 ++-- .../UniCreateFromCompletionStage.java | 5 ++-- .../UniCreateFromDeferredSupplier.java | 3 +- .../operators/UniCreateFromPublisher.java | 3 +- .../operators/UniCreateWithEmitter.java | 3 +- .../mutiny/operators/UniDelayOnItem.java | 3 +- .../mutiny/operators/UniDelayUntil.java | 3 +- .../operators/UniDelegatingSubscriber.java | 4 +-- .../smallrye/mutiny/operators/UniEmitOn.java | 3 +- .../mutiny/operators/UniFailOnTimeout.java | 4 +-- .../mutiny/operators/UniMemoizeOp.java | 14 ++++----- .../smallrye/mutiny/operators/UniNever.java | 4 ++- .../mutiny/operators/UniOnCancellation.java | 3 +- .../operators/UniOnCancellationCall.java | 3 +- .../mutiny/operators/UniOnFailureFlatMap.java | 3 +- .../operators/UniOnFailureTransform.java | 3 +- .../mutiny/operators/UniOnItemConsume.java | 5 ++-- .../operators/UniOnItemOrFailureConsume.java | 5 ++-- .../operators/UniOnItemOrFailureFlatMap.java | 5 ++-- .../operators/UniOnItemOrFailureMap.java | 29 ++++++++++++++----- .../mutiny/operators/UniOnItemTransform.java | 3 +- .../operators/UniOnItemTransformToUni.java | 6 ++-- .../mutiny/operators/UniOnSubscribeCall.java | 3 +- .../operators/UniOnSubscribeInvoke.java | 3 +- .../mutiny/operators/UniOnTermination.java | 3 +- .../operators/UniOnTerminationCall.java | 3 +- .../mutiny/operators/UniOrCombination.java | 2 +- .../mutiny/operators/UniRetryAtMost.java | 4 +-- .../mutiny/operators/UniRunSubscribeOn.java | 4 +-- .../uni/builders/KnownFailureUni.java | 4 +-- .../operators/uni/builders/KnownItemUni.java | 4 +-- .../infrastructure/UniInterceptorTest.java | 5 ++-- .../mutiny/operators/UniPlugTest.java | 3 +- .../operators/UniRunSubscriptionOnTest.java | 3 +- .../UniSerializedSubscriberTest.java | 21 +++++++------- 43 files changed, 123 insertions(+), 89 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniGlobalSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniGlobalSpy.java index dbef9aa2e..264dfc3c5 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniGlobalSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniGlobalSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniGlobalSpy extends UniSpyBase { @@ -68,7 +68,7 @@ public void reset() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { onTerminationSpy.subscribe().withSubscriber(downstream); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnCancellationSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnCancellationSpy.java index 93c63c8c8..72f6f9b37 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnCancellationSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnCancellationSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnCancellationSpy extends UniSpyBase { @@ -10,7 +10,7 @@ public class UniOnCancellationSpy extends UniSpyBase { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { upstream() .onCancellation().invoke(this::incrementInvocationCount) .subscribe().withSubscriber(downstream); diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnFailureSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnFailureSpy.java index e0ba95d36..c2dd4cb51 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnFailureSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnFailureSpy.java @@ -4,7 +4,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.groups.UniOnFailure; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnFailureSpy extends UniSpyBase { @@ -38,7 +38,7 @@ public void reset() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { UniOnFailure group; if (predicate != null) { group = upstream().onFailure(predicate); diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemOrFailureSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemOrFailureSpy.java index 817495cc0..647b2d8a1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemOrFailureSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemOrFailureSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnItemOrFailureSpy extends UniSpyBase { @@ -32,7 +32,7 @@ public Throwable lastFailure() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { upstream() .onItemOrFailure().invoke((item, failure) -> { synchronized (UniOnItemOrFailureSpy.this) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemSpy.java index d9b4cc84e..203509ce9 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnItemSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnItemSpy extends UniSpyBase { @@ -22,7 +22,7 @@ public void reset() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { upstream() .onItem().invoke(item -> { this.lastItem = item; diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java index e996b5235..f11a0f548 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnSubscribeSpy extends UniSpyBase { @@ -23,7 +23,7 @@ public void reset() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { upstream() .onSubscribe().invoke(uniSubscription -> { incrementInvocationCount(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnTerminationSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnTerminationSpy.java index 1c5abe861..ed23fcb81 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnTerminationSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnTerminationSpy.java @@ -1,7 +1,7 @@ package io.smallrye.mutiny.helpers.spies; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.tuples.Tuple3; public class UniOnTerminationSpy extends UniSpyBase { @@ -31,7 +31,7 @@ public void reset() { } @Override - protected void subscribing(UniSerializedSubscriber downstream) { + protected void subscribing(UniSubscriber downstream) { upstream() .onTermination().invoke((i, f, c) -> { incrementInvocationCount(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java index 5f7d908b8..518bd8374 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java @@ -13,7 +13,7 @@ public abstract class AbstractUni implements Uni { - protected abstract void subscribing(UniSerializedSubscriber subscriber); + protected abstract void subscribing(UniSubscriber subscriber); /** * Encapsulates subscription to slightly optimized the AbstractUni case. diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniAndCombination.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniAndCombination.java index 71dc4553a..b561f764e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniAndCombination.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniAndCombination.java @@ -41,7 +41,7 @@ public UniAndCombination(Uni upstream, List> other } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AndSupervisor andSupervisor = new AndSupervisor(subscriber); subscriber.onSubscribe(andSupervisor); // Must wait until the subscriber get a subscription before subscribing to the sources. @@ -51,11 +51,11 @@ protected void subscribing(UniSerializedSubscriber subscriber) { private class AndSupervisor implements UniSubscription { private final List handlers = new ArrayList<>(); - private final UniSerializedSubscriber subscriber; + private final UniSubscriber subscriber; AtomicBoolean cancelled = new AtomicBoolean(); - AndSupervisor(UniSerializedSubscriber sub) { + AndSupervisor(UniSubscriber sub) { subscriber = sub; for (Uni u : unis) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromCompletionStage.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromCompletionStage.java index a5bf96cbb..552a3a21b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromCompletionStage.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromCompletionStage.java @@ -7,6 +7,7 @@ import java.util.function.Supplier; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniCreateFromCompletionStage extends UniOperator { private final Supplier> supplier; @@ -17,7 +18,7 @@ public UniCreateFromCompletionStage(Supplier void forwardFromCompletionStage(CompletionStage stage, - UniSerializedSubscriber subscriber) { + UniSubscriber subscriber) { subscriber.onSubscribe(() -> stage.toCompletableFuture().cancel(false)); stage.whenComplete((res, fail) -> { if (fail != null) { @@ -33,7 +34,7 @@ private static void forwardFromCompletionStage(CompletionStage } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { CompletionStage stage; try { stage = supplier.get(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromDeferredSupplier.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromDeferredSupplier.java index b15fac013..b3e42fbd5 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromDeferredSupplier.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromDeferredSupplier.java @@ -7,6 +7,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniCreateFromDeferredSupplier extends UniOperator { private final Supplier> supplier; @@ -17,7 +18,7 @@ public UniCreateFromDeferredSupplier(Supplier> supplier) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { nonNull(subscriber, "subscriber"); Uni uni; try { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromPublisher.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromPublisher.java index 2b69da8d0..9886cb6e4 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromPublisher.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateFromPublisher.java @@ -10,6 +10,7 @@ import org.reactivestreams.Subscription; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniCreateFromPublisher extends UniOperator { private final Publisher publisher; @@ -21,7 +22,7 @@ public UniCreateFromPublisher(Publisher publisher) { @SuppressWarnings("SubscriberImplementation") @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AtomicReference reference = new AtomicReference<>(); Subscriber actual = new Subscriber() { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateWithEmitter.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateWithEmitter.java index 4a032bb1e..d67d3c40a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateWithEmitter.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniCreateWithEmitter.java @@ -5,6 +5,7 @@ import java.util.function.Consumer; import io.smallrye.mutiny.subscription.UniEmitter; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniCreateWithEmitter extends AbstractUni { private final Consumer> consumer; @@ -14,7 +15,7 @@ public UniCreateWithEmitter(Consumer> consumer) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { DefaultUniEmitter emitter = new DefaultUniEmitter<>(subscriber); subscriber.onSubscribe(emitter); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayOnItem.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayOnItem.java index d32100d8b..8fcac80b0 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayOnItem.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayOnItem.java @@ -11,6 +11,7 @@ import java.util.concurrent.atomic.AtomicReference; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniDelayOnItem extends UniOperator { @@ -24,7 +25,7 @@ public UniDelayOnItem(Uni upstream, Duration duration, ScheduledExecutorServi } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AtomicReference> holder = new AtomicReference<>(); AtomicReference reference = new AtomicReference<>(); AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayUntil.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayUntil.java index d1125291a..b5c786a0a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayUntil.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelayUntil.java @@ -8,6 +8,7 @@ import java.util.function.Function; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniDelayUntil extends UniOperator { @@ -22,7 +23,7 @@ public UniDelayUntil(Uni upstream, Function> function, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AtomicReference reference = new AtomicReference<>(); AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelegatingSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelegatingSubscriber.java index 1f8436144..8fb66ddbc 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelegatingSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniDelegatingSubscriber.java @@ -7,9 +7,9 @@ public class UniDelegatingSubscriber implements UniSubscriber { - private final UniSerializedSubscriber delegate; + private final UniSubscriber delegate; - public UniDelegatingSubscriber(UniSerializedSubscriber subscriber) { + public UniDelegatingSubscriber(UniSubscriber subscriber) { this.delegate = nonNull(subscriber, "delegate"); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniEmitOn.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniEmitOn.java index f9e2a2f71..362e1f878 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniEmitOn.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniEmitOn.java @@ -5,6 +5,7 @@ import java.util.concurrent.Executor; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniEmitOn extends UniOperator { private final Executor executor; @@ -15,7 +16,7 @@ public class UniEmitOn extends UniOperator { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override public void onItem(I item) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniFailOnTimeout.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniFailOnTimeout.java index 2bba76197..ff987f097 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniFailOnTimeout.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniFailOnTimeout.java @@ -33,7 +33,7 @@ public UniFailOnTimeout(Uni upstream, Duration timeout, Supplier subscriber) { + protected void subscribing(UniSubscriber subscriber) { AtomicBoolean doneOrCancelled = new AtomicBoolean(); AtomicReference> task = new AtomicReference<>(); @@ -92,7 +92,7 @@ public void onFailure(Throwable failure) { }); } - private void sendTimeout(UniSerializedSubscriber subscriber, UniSubscription subscription) { + private void sendTimeout(UniSubscriber subscriber, UniSubscription subscription) { // Cancel the upstream subscription subscription.cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniMemoizeOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniMemoizeOp.java index 833d7e403..780ee8ba8 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniMemoizeOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniMemoizeOp.java @@ -28,8 +28,8 @@ private enum State { private final AtomicReference state = new AtomicReference<>(State.INIT); private final AtomicInteger wip = new AtomicInteger(); - private final List> awaitingSubscription = synchronizedList(new ArrayList<>()); - private final List> awaitingResult = synchronizedList(new ArrayList<>()); + private final List> awaitingSubscription = synchronizedList(new ArrayList<>()); + private final List> awaitingResult = synchronizedList(new ArrayList<>()); private volatile UniSubscription upstreamSubscription; private volatile I item; @@ -45,7 +45,7 @@ public UniMemoizeOp(Uni upstream, BooleanSupplier invalidationReque } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { if (invalidationRequested.getAsBoolean() && state.get() != State.SUBSCRIBING) { state.set(State.INIT); if (upstreamSubscription != null) { @@ -89,7 +89,7 @@ private void drain() { int missed = 1; for (;;) { - ArrayList> subscribers; + ArrayList> subscribers; I currentItem; Throwable currentFailure; @@ -100,7 +100,7 @@ private void drain() { } // Handle the subscribers that are awaiting a subscription - for (UniSerializedSubscriber subscriber : subscribers) { + for (UniSubscriber subscriber : subscribers) { currentItem = item; currentFailure = failure; State state = this.state.get(); @@ -135,7 +135,7 @@ private void drain() { subscribers = new ArrayList<>(awaitingResult); } // Handle the subscribers that are awaiting a result - for (UniSerializedSubscriber subscriber : subscribers) { + for (UniSubscriber subscriber : subscribers) { currentItem = item; currentFailure = failure; if (state.get() == State.CACHING) { @@ -156,7 +156,7 @@ private void drain() { } } - private void removeFromAwaitingLists(UniSerializedSubscriber subscriber) { + private void removeFromAwaitingLists(UniSubscriber subscriber) { awaitingSubscription.remove(subscriber); awaitingResult.remove(subscriber); drain(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniNever.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniNever.java index adaeef625..1ce316eae 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniNever.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniNever.java @@ -2,6 +2,8 @@ import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED; +import io.smallrye.mutiny.subscription.UniSubscriber; + public class UniNever extends AbstractUni { public static final UniNever INSTANCE = new UniNever<>(); @@ -10,7 +12,7 @@ private UniNever() { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { subscriber.onSubscribe(CANCELLED); } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellation.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellation.java index 09fb58d1e..f4a5a37cc 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellation.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellation.java @@ -3,6 +3,7 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnCancellation extends UniOperator { @@ -14,7 +15,7 @@ public UniOnCancellation(Uni upstream, Runnable callback) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override public void onSubscribe(UniSubscription subscription) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellationCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellationCall.java index ca588f62d..576f0f728 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellationCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnCancellationCall.java @@ -6,6 +6,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnCancellationCall extends UniOperator { @@ -18,7 +19,7 @@ public UniOnCancellationCall(Uni upstream, Supplier> supplie } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { upstream().subscribe().withSubscriber(new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureFlatMap.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureFlatMap.java index 353ea04b3..4c6528f78 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureFlatMap.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureFlatMap.java @@ -7,6 +7,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnFailureFlatMap extends UniOperator { @@ -23,7 +24,7 @@ public UniOnFailureFlatMap(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { UniOnItemTransformToUni.FlatMapSubscription flatMapSubscription = new UniOnItemTransformToUni.FlatMapSubscription(); // Subscribe to the source. AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureTransform.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureTransform.java index 4a358838c..5a1e6abf3 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureTransform.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnFailureTransform.java @@ -8,6 +8,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnFailureTransform extends UniOperator { @@ -23,7 +24,7 @@ public UniOnFailureTransform(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemConsume.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemConsume.java index fa31d516b..0ee4d31b2 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemConsume.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemConsume.java @@ -5,6 +5,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnItemConsume extends UniOperator { @@ -22,7 +23,7 @@ public UniOnItemConsume(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override public void onItem(T item) { @@ -55,7 +56,7 @@ public void onFailure(Throwable failure) { } private boolean invokeEventHandler(Consumer handler, E event, boolean wasCalledByOnFailure, - UniSerializedSubscriber subscriber) { + UniSubscriber subscriber) { if (handler != null) { try { handler.accept(event); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureConsume.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureConsume.java index 137078845..6f330eec2 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureConsume.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureConsume.java @@ -4,6 +4,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnItemOrFailureConsume extends UniOperator { @@ -16,7 +17,7 @@ public UniOnItemOrFailureConsume(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override public void onItem(T item) { @@ -34,7 +35,7 @@ public void onFailure(Throwable failure) { }); } - private boolean invokeCallback(T item, Throwable failure, UniSerializedSubscriber subscriber) { + private boolean invokeCallback(T item, Throwable failure, UniSubscriber subscriber) { try { callback.accept(item, failure); return true; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureFlatMap.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureFlatMap.java index ae4d3d3a8..fb06dfe40 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureFlatMap.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureFlatMap.java @@ -7,6 +7,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnItemOrFailureFlatMap extends UniOperator { @@ -22,7 +23,7 @@ public UniOnItemOrFailureFlatMap(Uni upstream, public static void invokeAndSubstitute(BiFunction> mapper, I item, Throwable failure, - UniSerializedSubscriber subscriber, + UniSubscriber subscriber, UniOnItemTransformToUni.FlatMapSubscription flatMapSubscription) { Uni outcome; try { @@ -42,7 +43,7 @@ public static void invokeAndSubstitute(BiFunction subscriber) { + protected void subscribing(UniSubscriber subscriber) { UniOnItemTransformToUni.FlatMapSubscription flatMapSubscription = new UniOnItemTransformToUni.FlatMapSubscription(); // Subscribe to the source. AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureMap.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureMap.java index 29379cd66..5bba11d25 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureMap.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemOrFailureMap.java @@ -1,10 +1,13 @@ package io.smallrye.mutiny.operators; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; +import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnItemOrFailureMap extends UniOperator { @@ -16,12 +19,22 @@ public UniOnItemOrFailureMap(Uni source, BiFunction subscriber) { - AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { + protected void subscribing(UniSubscriber downstream) { + AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(downstream) { + + private final AtomicBoolean done = new AtomicBoolean(); + + @Override + public void onSubscribe(UniSubscription subscription) { + super.onSubscribe(() -> { + done.set(true); + subscription.cancel(); + }); + } @Override public void onItem(I item) { - if (!subscriber.isCancelledOrDone()) { + if (done.compareAndSet(false, true)) { O outcome; try { outcome = mapper.apply(item, null); @@ -29,17 +42,17 @@ public void onItem(I item) { // it would be caught and onFailure would be called. This would be illegal. } catch (Throwable e) { // NOSONAR // Be sure to not call the mapper again with the failure. - subscriber.onFailure(e); + downstream.onFailure(e); return; } - subscriber.onItem(outcome); + downstream.onItem(outcome); } } @Override public void onFailure(Throwable failure) { - if (!subscriber.isCancelledOrDone()) { + if (done.compareAndSet(false, true)) { O outcome; try { outcome = mapper.apply(null, failure); @@ -47,11 +60,11 @@ public void onFailure(Throwable failure) { // it would be caught and onFailure would be called. This would be illegal. } catch (Throwable e) { // NOSONAR // Be sure to not call the mapper again with the failure. - subscriber.onFailure(new CompositeException(failure, e)); + downstream.onFailure(new CompositeException(failure, e)); return; } - subscriber.onItem(outcome); + downstream.onItem(outcome); } } }); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransform.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransform.java index 24ca4b1bf..ea9fa10c8 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransform.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransform.java @@ -4,6 +4,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniOnItemTransform extends UniOperator { @@ -15,7 +16,7 @@ public UniOnItemTransform(Uni source, Function mapper } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransformToUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransformToUni.java index 8a1e9fa2f..98c791d26 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransformToUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnItemTransformToUni.java @@ -24,7 +24,7 @@ public UniOnItemTransformToUni(Uni upstream, Function void invokeAndSubstitute(Function> mapper, I input, - UniSerializedSubscriber subscriber, + UniSubscriber subscriber, FlatMapSubscription flatMapSubscription) { Uni outcome; try { @@ -43,7 +43,7 @@ public static void invokeAndSubstitute(Function void handleInnerSubscription(UniSerializedSubscriber subscriber, + public static void handleInnerSubscription(UniSubscriber subscriber, UniOnItemTransformToUni.FlatMapSubscription flatMapSubscription, Uni outcome) { if (outcome == null) { subscriber.onFailure(new NullPointerException(MAPPER_RETURNED_NULL)); @@ -59,7 +59,7 @@ public void onSubscribe(UniSubscription secondSubscription) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { FlatMapSubscription flatMapSubscription = new FlatMapSubscription(); // Subscribe to the source. AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeCall.java index f37dc8f40..5614035dd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeCall.java @@ -7,6 +7,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.EmptyUniSubscription; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnSubscribeCall extends UniOperator { @@ -20,7 +21,7 @@ public UniOnSubscribeCall(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { // As subscription might be delayed, we need to store the event provided by the upstream diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeInvoke.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeInvoke.java index 1494845cc..c054a88db 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeInvoke.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnSubscribeInvoke.java @@ -5,6 +5,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.EmptyUniSubscription; import io.smallrye.mutiny.helpers.ParameterValidation; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnSubscribeInvoke extends UniOperator { @@ -18,7 +19,7 @@ public UniOnSubscribeInvoke(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override public void onSubscribe(UniSubscription subscription) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTermination.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTermination.java index 3ceb9ab13..fb3b08699 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTermination.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTermination.java @@ -4,6 +4,7 @@ import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; import io.smallrye.mutiny.tuples.Functions; @@ -16,7 +17,7 @@ public UniOnTermination(Uni upstream, Functions.TriConsumer subscriber) { + protected void subscribing(UniSubscriber subscriber) { AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTerminationCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTerminationCall.java index 6167d2651..a87f7d9fa 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTerminationCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOnTerminationCall.java @@ -9,6 +9,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.Cancellable; +import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; import io.smallrye.mutiny.tuples.Functions; @@ -23,7 +24,7 @@ public UniOnTerminationCall(Uni upstream, } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { upstream().subscribe().withSubscriber(new UniDelegatingSubscriber(subscriber) { private volatile Cancellable cancellable; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOrCombination.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOrCombination.java index 56f86d189..98ca87c5a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniOrCombination.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniOrCombination.java @@ -33,7 +33,7 @@ public UniOrCombination(Uni[] array) { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { if (challengers.isEmpty()) { subscriber.onSubscribe(EmptyUniSubscription.CANCELLED); subscriber.onItem(null); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniRetryAtMost.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniRetryAtMost.java index ef9063bc9..708fd0654 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniRetryAtMost.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniRetryAtMost.java @@ -23,7 +23,7 @@ public UniRetryAtMost(Uni upstream, Predicate predicate, l } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { AtomicInteger numberOfSubscriptions = new AtomicInteger(0); UniSubscriber retryingSubscriber = new UniSubscriber() { final AtomicReference reference = new AtomicReference<>(); @@ -79,7 +79,7 @@ private void resubscribe(Uni upstream, UniSubscriber subscriber) } private boolean test( - UniSerializedSubscriber subscriber, Throwable failure) { + UniSubscriber subscriber, Throwable failure) { boolean pass; try { pass = predicate.test(failure); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/UniRunSubscribeOn.java b/implementation/src/main/java/io/smallrye/mutiny/operators/UniRunSubscribeOn.java index 0d3548e7a..d88b7d802 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/UniRunSubscribeOn.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/UniRunSubscribeOn.java @@ -20,7 +20,7 @@ public UniRunSubscribeOn(Uni upstream, Executor executor) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { SubscribeOnUniSubscriber downstream = new SubscribeOnUniSubscriber(subscriber); try { executor.execute(downstream); @@ -36,7 +36,7 @@ class SubscribeOnUniSubscriber extends UniDelegatingSubscriber final AtomicReference subscription = new AtomicReference<>(); - SubscribeOnUniSubscriber(UniSerializedSubscriber actual) { + SubscribeOnUniSubscriber(UniSubscriber actual) { super(actual); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownFailureUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownFailureUni.java index f23b73c24..9d64498a6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownFailureUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownFailureUni.java @@ -2,7 +2,7 @@ import io.smallrye.mutiny.helpers.EmptyUniSubscription; import io.smallrye.mutiny.operators.AbstractUni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; /** * Specialized {@link io.smallrye.mutiny.Uni} implementation for the case where the failure is known. @@ -19,7 +19,7 @@ public KnownFailureUni(Throwable failure) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { subscriber.onSubscribe(EmptyUniSubscription.CANCELLED); subscriber.onFailure(failure); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownItemUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownItemUni.java index 4d480b3d6..3d04b01ab 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownItemUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/builders/KnownItemUni.java @@ -2,7 +2,7 @@ import io.smallrye.mutiny.helpers.EmptyUniSubscription; import io.smallrye.mutiny.operators.AbstractUni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; /** * Specialized {@link io.smallrye.mutiny.Uni} implementation for the case where the item is known. @@ -19,7 +19,7 @@ public KnownItemUni(T item) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { // No need to track cancellation, it's done by the serialized subscriber downstream. subscriber.onSubscribe(EmptyUniSubscription.CANCELLED); subscriber.onItem(item); diff --git a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/UniInterceptorTest.java b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/UniInterceptorTest.java index 6b30e1a22..9c7aef51b 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/infrastructure/UniInterceptorTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/infrastructure/UniInterceptorTest.java @@ -8,7 +8,6 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.AbstractUni; import io.smallrye.mutiny.operators.UniDelegatingSubscriber; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; @@ -62,7 +61,7 @@ public void testCreationInterception() { public Uni onUniCreation(Uni uni) { return new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { assertThat(creationTime).isLessThan(System.nanoTime()); uni.subscribe().withSubscriber(new UniDelegatingSubscriber(subscriber) { @Override @@ -89,7 +88,7 @@ public void testCreationInterceptionWithMap() { public Uni onUniCreation(Uni uni) { return new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { assertThat(creationTime).isLessThan(System.nanoTime()); uni.subscribe().withSubscriber(new UniDelegatingSubscriber(subscriber) { @Override diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniPlugTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniPlugTest.java index 5c31e3b22..7979177d6 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniPlugTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniPlugTest.java @@ -7,6 +7,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import io.smallrye.mutiny.subscription.UniSubscriber; class UniPlugTest { @@ -57,7 +58,7 @@ public Greeter(Uni upstream) { } @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { upstream().subscribe().withSubscriber(new UniDelegatingSubscriber(subscriber) { @Override public void onItem(T item) { diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniRunSubscriptionOnTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniRunSubscriptionOnTest.java index 6f89cf5d4..b7ee1a1f5 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniRunSubscriptionOnTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniRunSubscriptionOnTest.java @@ -14,6 +14,7 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.subscription.UniSubscriber; public class UniRunSubscriptionOnTest { @@ -140,7 +141,7 @@ public void testSubscriptionFailing() { UniAssertSubscriber subscriber = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { throw new IllegalArgumentException("boom"); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniSerializedSubscriberTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniSerializedSubscriberTest.java index c53210146..e4f6f1bee 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniSerializedSubscriberTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniSerializedSubscriberTest.java @@ -91,7 +91,7 @@ public void testNormalWithNullItem() { public void testRogueUpstreamSendingFailureBeforeSubscription() { AbstractUni rogue = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { subscriber.onFailure(new IOException("boom")); subscriber.onSubscribe(() -> { }); @@ -111,7 +111,7 @@ protected void subscribing(UniSerializedSubscriber subscriber) public void testRogueUpstreamSendingItemBeforeSubscription() { AbstractUni rogue = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { subscriber.onItem(1); subscriber.onSubscribe(() -> { }); @@ -131,7 +131,7 @@ protected void subscribing(UniSerializedSubscriber subscriber) public void testInvalidStateWhenOnSubscribeIsCalled() { AbstractUni rogue = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { // Do nothing } }; @@ -151,7 +151,7 @@ protected void subscribing(UniSerializedSubscriber subscriber) public void testRogueUpstreamSendingMultipleItems() { AbstractUni rogue = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { subscriber.onSubscribe(() -> { }); subscriber.onItem(1); @@ -382,10 +382,10 @@ public void testDroppedExceptionsWhenOnFailureCalledMultipleTimes() { AtomicReference captured = new AtomicReference<>(); Infrastructure.setDroppedExceptionHandler(captured::set); - AtomicReference> sub = new AtomicReference<>(); + AtomicReference> sub = new AtomicReference<>(); AbstractUni uni = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { sub.set(subscriber); } }; @@ -424,10 +424,10 @@ public void testDroppedExceptionsWhenOnFailureCalledAfterOnItem() { AtomicReference captured = new AtomicReference<>(); Infrastructure.setDroppedExceptionHandler(captured::set); - AtomicReference> sub = new AtomicReference<>(); + AtomicReference> sub = new AtomicReference<>(); AbstractUni uni = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { sub.set(subscriber); } }; @@ -463,10 +463,10 @@ public void testDroppedExceptionsWhenOnFailureCalledAfterCancellation() { AtomicReference captured = new AtomicReference<>(); Infrastructure.setDroppedExceptionHandler(captured::set); - AtomicReference> sub = new AtomicReference<>(); + AtomicReference> sub = new AtomicReference<>(); AbstractUni uni = new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { sub.set(subscriber); } }; @@ -488,7 +488,6 @@ public void onFailure(Throwable failure) { } }); sub.get().onSubscribe(mock(UniSubscription.class)); - sub.get().cancel(); sub.get().onFailure(new IllegalStateException("boom")); assertThat(captured.get()).isNotNull() From 9985faf7359dff837e8f5a7c2660d06280625886 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 7 Dec 2020 21:39:42 +0100 Subject: [PATCH 2/4] RevApi document non-breaking-breaking changes in the Uni spies APIs --- implementation/revapi.json | 43 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/implementation/revapi.json b/implementation/revapi.json index 2e02bd401..60094e6f3 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -14,6 +14,7 @@ "io\\.smallrye\\.mutiny\\..*" ], "exclude": [ + "io\\.smallrye\\.mutiny\\.operators\\.*", "io\\.smallrye\\.mutiny\\.operators\\..*" ] } @@ -32,6 +33,48 @@ "old": "method io.smallrye.mutiny.helpers.Subscriptions.SingleItemSubscription io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber, T)", "new": "method org.reactivestreams.Subscription io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber, T)", "justification": "SingleItemSubscription should not be exposed, and should be used as a regular Subscription" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" } ] } From cd7f54c926007cee093a98fac71b1f6ffe51b872 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 7 Dec 2020 22:07:32 +0100 Subject: [PATCH 3/4] ContextPropagationUniInterceptor fix to UniSubscriber --- .../mutiny/context/ContextPropagationUniInterceptor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/context-propagation/src/main/java/io/smallrye/mutiny/context/ContextPropagationUniInterceptor.java b/context-propagation/src/main/java/io/smallrye/mutiny/context/ContextPropagationUniInterceptor.java index 4895d3caa..29164995e 100644 --- a/context-propagation/src/main/java/io/smallrye/mutiny/context/ContextPropagationUniInterceptor.java +++ b/context-propagation/src/main/java/io/smallrye/mutiny/context/ContextPropagationUniInterceptor.java @@ -7,7 +7,6 @@ import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.UniInterceptor; import io.smallrye.mutiny.operators.AbstractUni; -import io.smallrye.mutiny.operators.UniSerializedSubscriber; import io.smallrye.mutiny.subscription.UniSubscriber; import io.smallrye.mutiny.subscription.UniSubscription; @@ -44,7 +43,7 @@ public Uni onUniCreation(Uni uni) { Executor executor = getThreadContext().currentContextExecutor(); return new AbstractUni() { @Override - protected void subscribing(UniSerializedSubscriber subscriber) { + protected void subscribing(UniSubscriber subscriber) { executor.execute(() -> AbstractUni.subscribe(uni, subscriber)); } }; From cd2831e048069cf2cb412206b68e3fbcf6c2d30c Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 7 Dec 2020 22:07:42 +0100 Subject: [PATCH 4/4] RevApi fixes --- implementation/revapi.json | 6 ++++++ reactor/revapi.json | 12 ++++++++---- rxjava/revapi.json | 12 ++++++++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index 60094e6f3..cc72fc242 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -75,6 +75,12 @@ "old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", "new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" + }, + { + "code": "java.method.parameterTypeChanged", + "old": "parameter void io.smallrye.mutiny.operators.AbstractUni::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber===)", + "new": "parameter void io.smallrye.mutiny.operators.AbstractUni::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber===)", + "justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API" } ] } diff --git a/reactor/revapi.json b/reactor/revapi.json index 6be0092b4..013c5e2af 100644 --- a/reactor/revapi.json +++ b/reactor/revapi.json @@ -10,7 +10,13 @@ "filter": { "packages": { "regex": true, - "include": ["io\\.smallrye\\.mutiny\\..*"] + "include": [ + "io\\.smallrye\\.mutiny\\..*" + ], + "exclude": [ + "io\\.smallrye\\.mutiny\\.operators\\.*", + "io\\.smallrye\\.mutiny\\.operators\\..*" + ] } } } @@ -21,9 +27,7 @@ "id": "breaking-changes", "configuration": { "criticality": "highlight", - "differences": [ - - ] + "differences": [] } }, diff --git a/rxjava/revapi.json b/rxjava/revapi.json index 6be0092b4..013c5e2af 100644 --- a/rxjava/revapi.json +++ b/rxjava/revapi.json @@ -10,7 +10,13 @@ "filter": { "packages": { "regex": true, - "include": ["io\\.smallrye\\.mutiny\\..*"] + "include": [ + "io\\.smallrye\\.mutiny\\..*" + ], + "exclude": [ + "io\\.smallrye\\.mutiny\\.operators\\.*", + "io\\.smallrye\\.mutiny\\.operators\\..*" + ] } } } @@ -21,9 +27,7 @@ "id": "breaking-changes", "configuration": { "criticality": "highlight", - "differences": [ - - ] + "differences": [] } },