From 8ad84683224adc6096f2591b1b2bfac612fc26e4 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 17 May 2021 11:33:25 +0200 Subject: [PATCH 1/4] Deprecate onSubscribe() in favor or onSubscription() --- .../context/MultiContextPropagationTest.java | 2 +- .../src/main/jekyll/guides/logging.adoc | 4 +-- .../src/test/java/guides/EventsTest.java | 2 +- .../src/test/java/guides/ObserveTest.java | 2 +- implementation/revapi.json | 15 +++++++- .../main/java/io/smallrye/mutiny/Multi.java | 20 +++++++++++ .../src/main/java/io/smallrye/mutiny/Uni.java | 22 ++++++++++++ .../mutiny/groups/MultiOnSubscribe.java | 4 +-- .../mutiny/groups/UniOnSubscribe.java | 4 +-- .../helpers/spies/MultiOnSubscribeSpy.java | 2 +- .../io/smallrye/mutiny/helpers/spies/Spy.java | 4 +-- .../helpers/spies/UniOnSubscribeSpy.java | 2 +- .../mutiny/operators/AbstractMulti.java | 5 +++ .../mutiny/operators/AbstractUni.java | 5 +++ .../subscription/UniSerializedSubscriber.java | 2 +- .../mutiny/groups/MultiDisjointTest.java | 6 ++-- .../mutiny/helpers/spies/SpyTest.java | 4 +-- .../helpers/test/AssertSubscriberTest.java | 6 ++-- .../helpers/test/UniAssertSubscriberTest.java | 6 ++-- .../mutiny/operators/MultiOnEventTest.java | 18 +++++----- .../operators/MultiOnFailureRetryTest.java | 2 +- .../MultiOnFailureRetryWhenTest.java | 10 +++--- .../mutiny/operators/MultiOnFailureTest.java | 4 +-- .../mutiny/operators/MultiOnRequestTest.java | 2 +- .../operators/MultiOnSubscribeTest.java | 30 ++++++++-------- .../MultiOnTerminationUniInvokeTest.java | 10 +++--- .../operators/MultiTransformToMultiTest.java | 4 +-- .../mutiny/operators/UniOnEventTest.java | 30 ++++++++-------- .../mutiny/operators/UniOnSubscribeTest.java | 36 +++++++++---------- .../smallrye/mutiny/operators/UniZipTest.java | 20 +++++------ .../MultiFromResourceFromUniTest.java | 22 ++++++------ .../multi/builders/MultiFromResourceTest.java | 22 ++++++------ ...va => MultiOnSubscriptionCallTckTest.java} | 6 ++-- ... => MultiOnSubscriptionInvokeTckTest.java} | 6 ++-- 34 files changed, 202 insertions(+), 137 deletions(-) rename implementation/src/test/java/tck/{MultiOnSubscribeCallTckTest.java => MultiOnSubscriptionCallTckTest.java} (57%) rename implementation/src/test/java/tck/{MultiOnSubscribeInvokeTckTest.java => MultiOnSubscriptionInvokeTckTest.java} (67%) diff --git a/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java b/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java index 4c41a7cda..1b8f6a8d4 100644 --- a/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java +++ b/context-propagation/src/test/java/io/smallrye/mutiny/context/MultiContextPropagationTest.java @@ -237,7 +237,7 @@ public void testOnSubscribe() { AssertSubscriber subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(1)) .select().first(5) - .onSubscribe().invoke(() -> { + .onSubscription().invoke(() -> { assertThat(ctx).isSameAs(MyContext.get()); MyContext.get().set("test"); }) diff --git a/documentation/src/main/jekyll/guides/logging.adoc b/documentation/src/main/jekyll/guides/logging.adoc index a4c7e8b4d..c4015f0a1 100644 --- a/documentation/src/main/jekyll/guides/logging.adoc +++ b/documentation/src/main/jekyll/guides/logging.adoc @@ -22,7 +22,7 @@ Here the `log` operator traces all events between the `onItem().transform(...)` [source,text] ---- -11:01:48.709 [main] INFO Multi.MultiMapOp.0 - onSubscribe() +11:01:48.709 [main] INFO Multi.MultiMapOp.0 - onSubscription() 11:01:48.711 [main] INFO Multi.MultiMapOp.0 - request(9223372036854775807) 11:01:48.711 [main] INFO Multi.MultiMapOp.0 - onItem(10) >>> 10 @@ -46,7 +46,7 @@ Events are written by default to the standard console output in a format similar [source,text] ---- -[--> Multi.MultiMapOp.0 | onSubscribe() +[--> Multi.MultiMapOp.0 | onSubscription() [--> Multi.MultiMapOp.0 | request(9223372036854775807) [--> Multi.MultiMapOp.0 | onItem(10) [--> Multi.MultiMapOp.0 | onItem(20) diff --git a/documentation/src/test/java/guides/EventsTest.java b/documentation/src/test/java/guides/EventsTest.java index 534a3ae2f..55a01858b 100644 --- a/documentation/src/test/java/guides/EventsTest.java +++ b/documentation/src/test/java/guides/EventsTest.java @@ -29,7 +29,7 @@ public void test(SystemOut out) { .invoke(failure -> log("Failed with " + failure)) .onCompletion() // Called when the stream completes .invoke(() -> log("Completed")) - .onSubscribe() // Called when the upstream is ready + .onSubscription() // Called when the upstream is ready .invoke(subscription -> log("We are subscribed!")) .onCancellation() // Called when the downstream cancels .invoke(() -> log("Cancelled :-(")) diff --git a/documentation/src/test/java/guides/ObserveTest.java b/documentation/src/test/java/guides/ObserveTest.java index c98078ca9..e4c2b15e1 100644 --- a/documentation/src/test/java/guides/ObserveTest.java +++ b/documentation/src/test/java/guides/ObserveTest.java @@ -40,7 +40,7 @@ public void all() { Multi multi = Multi.createFrom().items("a", "b", "c"); // tag::invoke-all[] multi - .onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed")) + .onSubscription().invoke(() -> System.out.println("⬇️ Subscribed")) .onItem().invoke(i -> System.out.println("⬇️ Received item: " + i)) .onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f)) .onCompletion().invoke(() -> System.out.println("⬇️ Completed")) diff --git a/implementation/revapi.json b/implementation/revapi.json index 4e3bf76ef..b35adb758 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -21,7 +21,20 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.addedToInterface", + "new": "method io.smallrye.mutiny.groups.MultiOnSubscribe io.smallrye.mutiny.Multi::onSubscription()", + "justification": "Deprecated API" + }, + { + "ignore": true, + "code": "java.method.addedToInterface", + "new": "method io.smallrye.mutiny.groups.UniOnSubscribe io.smallrye.mutiny.Uni::onSubscription()", + "justification": "Deprecated API" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index 59bf83356..65c62822c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -127,9 +127,29 @@ default O stage(Function, O> stage) { * * * @return the object to configure the action to execution on subscription. + * @deprecated use {@link #onSubscription()} instead */ + @Deprecated MultiOnSubscribe onSubscribe(); + /** + * Configures the action to execute when the observed {@link Multi} sends a {@link Subscription}. + * The downstream don't have a subscription yet. It will be passed once the configured action completes. + *

+ * For example: + * + *

+     * {@code
+     * multi.onSubscription().invoke(sub -> System.out.println("subscribed"));
+     * // Delay the subscription by 1 second (or until an asynchronous action completes)
+     * multi.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
+     * }
+     * 
+ * + * @return the object to configure the action to execution on subscription. + */ + MultiOnSubscribe onSubscription(); + /** * Configures a type of failure filtering the failures on which the behavior (specified with the returned * {@link MultiOnFailure}) is applied. diff --git a/implementation/src/main/java/io/smallrye/mutiny/Uni.java b/implementation/src/main/java/io/smallrye/mutiny/Uni.java index b44ac8090..1ad15012f 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Uni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Uni.java @@ -194,9 +194,31 @@ default CompletableFuture subscribeAsCompletionStage() { * * * @return the object to configure the action to execution on subscription. + * @deprecated use {@link #onSubscription()} instead */ + @Deprecated UniOnSubscribe onSubscribe(); + /** + * Configures the action to execute when the observed {@link Uni} sends a {@link UniSubscription}. + * The downstream don't have a subscription yet. It will be passed once the configured action completes. + * + *

+ * Example: + *

+ * + *
+     * {@code
+     * uni.onSubscription().invoke(sub -> System.out.println("subscribed"));
+     * // Delay the subscription by 1 second (or until an asynchronous action completes)
+     * uni.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
+     * }
+     * 
+ * + * @return the object to configure the action to execution on subscription. + */ + UniOnSubscribe onSubscription(); + /** * Configures the action to execute when the observed {@link Uni} emits either an item (potentially {@code null})) * or a failure. Unlike {@link #onItem()} and {@link #onFailure()} the action would handle both cases in on "go". diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnSubscribe.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnSubscribe.java index 02ad7315d..c5c903ad9 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnSubscribe.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnSubscribe.java @@ -22,9 +22,9 @@ * *
  * {@code
- * multi.onSubscribe().invoke(sub -> System.out.println("subscribed"));
+ * multi.onSubscription().invoke(sub -> System.out.println("subscribed"));
  * // Delay the subscription by 1 second (or until an asynchronous action completes)
- * multi.onSubscribe().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
+ * multi.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
  * }
  * 
* diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnSubscribe.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnSubscribe.java index 116a41f3b..9bdfea5bd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnSubscribe.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniOnSubscribe.java @@ -22,9 +22,9 @@ * *
  * {@code
- * uni.onSubscribe().invoke(sub -> System.out.println("subscribed"));
+ * uni.onSubscription().invoke(sub -> System.out.println("subscribed"));
  * // Delay the subscription by 1 second (or until an asynchronous action completes)
- * uni.onSubscribe().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
+ * uni.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
  *}
  * 
* diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/MultiOnSubscribeSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/MultiOnSubscribeSpy.java index 54b1b3097..d5456dd19 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/MultiOnSubscribeSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/MultiOnSubscribeSpy.java @@ -25,7 +25,7 @@ public void reset() { @Override public void subscribe(MultiSubscriber downstream) { - upstream.onSubscribe().invoke(subscription -> { + upstream.onSubscription().invoke(subscription -> { incrementInvocationCount(); lastSubscription = subscription; }).subscribe().withSubscriber(downstream); diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/Spy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/Spy.java index 13b1202bd..0d8b27a9d 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/Spy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/Spy.java @@ -25,7 +25,7 @@ public interface Spy { // --------------------------------------------------------------------- // /** - * Spy {@link Uni#onSubscribe()} events. + * Spy {@link Uni#onSubscription()} events. * * @param upstream the upstream * @param the item type @@ -219,7 +219,7 @@ static MultiOnRequestSpy onRequest(Multi upstream) { } /** - * Spy {@link Multi#onSubscribe()} events. + * Spy {@link Multi#onSubscription()} events. * * @param upstream the upstream * @param the items type diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java index 816f28b67..857d07791 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/spies/UniOnSubscribeSpy.java @@ -25,7 +25,7 @@ public void reset() { @Override public void subscribe(UniSubscriber downstream) { upstream() - .onSubscribe().invoke(uniSubscription -> { + .onSubscription().invoke(uniSubscription -> { incrementInvocationCount(); lastSubscription = uniSubscription; }) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index 9848acfde..b404e4dbd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -115,6 +115,11 @@ public MultiOnSubscribe onSubscribe() { return new MultiOnSubscribe<>(this); } + @Override + public MultiOnSubscribe onSubscription() { + return new MultiOnSubscribe<>(this); + } + @Override public MultiBroadcast broadcast() { return new MultiBroadcast<>(this); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java index c8cf0a9ba..2a8f52dd5 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java @@ -74,6 +74,11 @@ public UniOnSubscribe onSubscribe() { return new UniOnSubscribe<>(this); } + @Override + public UniOnSubscribe onSubscription() { + return new UniOnSubscribe<>(this); + } + @Override public UniOnItemOrFailure onItemOrFailure() { return new UniOnItemOrFailure<>(this); diff --git a/implementation/src/main/java/io/smallrye/mutiny/subscription/UniSerializedSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/subscription/UniSerializedSubscriber.java index 0608eeabe..6c4ba1aeb 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/subscription/UniSerializedSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/subscription/UniSerializedSubscriber.java @@ -109,7 +109,7 @@ public void onFailure(Throwable throwable) { public void cancel() { if (state.compareAndSet(HAS_SUBSCRIPTION, DONE)) { while (subscription == null) { - // We are in the middle of a race condition with onSubscribe() + // We are in the middle of a race condition with onSubscription() } if (subscription != null) { // May have been cancelled already by another thread. subscription.cancel(); diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiDisjointTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiDisjointTest.java index ed3ec5dfe..67b835635 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiDisjointTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiDisjointTest.java @@ -26,7 +26,7 @@ public void testWithMultis() { Multi.createFrom().items("d", "e"), Multi.createFrom().empty(), Multi.createFrom().items("f", "g") - .onSubscribe().invoke(() -> subscribed.set(true))) + .onSubscription().invoke(() -> subscribed.set(true))) .onItem(). disjoint() .subscribe().withSubscriber(AssertSubscriber.create(4)); assertThat(subscribed).isFalse(); @@ -95,7 +95,7 @@ public void testWithMultisWithAFailure() { Multi.createFrom().items("d", "e"), Multi.createFrom().failure(new IOException("boom")), Multi.createFrom().items("f", "g") - .onSubscribe().invoke(() -> subscribed.set(true))) + .onSubscription().invoke(() -> subscribed.set(true))) .onItem(). disjoint() .subscribe().withSubscriber(AssertSubscriber.create(4)); assertThat(subscribed).isFalse(); @@ -116,7 +116,7 @@ public void testWithMultisWithOneEmittingAFailure() { e.fail(new IOException("boom")); }), Multi.createFrom().items("g") - .onSubscribe().invoke(() -> subscribed.set(true))) + .onSubscription().invoke(() -> subscribed.set(true))) .onItem(). disjoint() .subscribe().withSubscriber(AssertSubscriber.create(4)); assertThat(subscribed).isFalse(); diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/spies/SpyTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/spies/SpyTest.java index e7c826388..39db7d9bd 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/spies/SpyTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/spies/SpyTest.java @@ -23,7 +23,7 @@ class SpyTest { @Nested class SpyUni { @Test - @DisplayName("Spy onSubscribe()") + @DisplayName("Spy onSubscription()") void spyOnSubscribeSpy() { UniOnSubscribeSpy spy = Spy.onSubscribe(Uni.createFrom().item(69)); UniAssertSubscriber subscriber = spy.subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -489,7 +489,7 @@ void spyOnRequest() { } @Test - @DisplayName("Spy onSubscribe()") + @DisplayName("Spy onSubscription()") void spyOnSubscribe() { MultiOnSubscribeSpy spy = Spy.onSubscribe(Multi.createFrom().items(1, 2, 3)); AssertSubscriber subscriber = spy.subscribe().withSubscriber(AssertSubscriber.create(10)); diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java index b9dd1cda1..463172b50 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/AssertSubscriberTest.java @@ -323,18 +323,18 @@ public void testAwaitSubscription() { // Delay subscriber = Multi.createFrom().items(1) - .onSubscribe().call(x -> smallDelay) + .onSubscription().call(x -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(0)); assertThat(subscriber.awaitSubscription()).isSameAs(subscriber); subscriber = Multi.createFrom().items(1) - .onSubscribe().call(x -> smallDelay) + .onSubscription().call(x -> smallDelay) .subscribe().withSubscriber(AssertSubscriber.create(0)); assertThat(subscriber.awaitSubscription(MEDIUM)).isSameAs(subscriber); // timeout subscriber = Multi.createFrom().items(1) - .onSubscribe().call(x -> mediumDelay) + .onSubscription().call(x -> mediumDelay) .subscribe().withSubscriber(AssertSubscriber.create(0)); AssertSubscriber tmp = subscriber; assertThatThrownBy(() -> tmp.awaitSubscription(SMALL)).isInstanceOf(AssertionError.class) diff --git a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriberTest.java b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriberTest.java index 694510020..841e87209 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriberTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/helpers/test/UniAssertSubscriberTest.java @@ -244,18 +244,18 @@ public void testAwaitSubscription() { // Delay subscriber = Uni.createFrom().item(1) - .onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1))) + .onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1))) .subscribe().withSubscriber(UniAssertSubscriber.create()); assertThat(subscriber.awaitSubscription()).isSameAs(subscriber); subscriber = Uni.createFrom().item(1) - .onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1))) + .onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1))) .subscribe().withSubscriber(UniAssertSubscriber.create()); assertThat(subscriber.awaitSubscription(Duration.ofSeconds(5))).isSameAs(subscriber); // timeout subscriber = Uni.createFrom().item(1) - .onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(10))) + .onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(10))) .subscribe().withSubscriber(UniAssertSubscriber.create()); UniAssertSubscriber tmp = subscriber; assertThatThrownBy(() -> tmp.awaitSubscription(Duration.ofMillis(100))).isInstanceOf(AssertionError.class) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnEventTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnEventTest.java index ff7345b71..889ec9920 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnEventTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnEventTest.java @@ -45,7 +45,7 @@ public void testCallbacksWhenItemIsEmitted() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom().item(1) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onItem().invoke(() -> invokedOnItemRunnable.set(true)) @@ -96,7 +96,7 @@ public void testCallbacksWhenItemIsEmittedUsingOnAndThenGroup() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom().item(1) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onTermination().invoke(() -> completion.set(true)) @@ -135,7 +135,7 @@ public void testCallbacksWhenItemIsEmittedWithDeprecatedApis() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom().item(1) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -172,7 +172,7 @@ public void testCallbacksOnFailure() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom(). failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -204,7 +204,7 @@ public void testCallbacksOnFailureWhenPredicateMatches() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom(). failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure(IOException.class).invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -234,7 +234,7 @@ public void testCallbacksOnFailureWhenPredicateDoesNotPass() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom(). failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure(f -> f.getMessage().contains("missing")).invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -268,7 +268,7 @@ public void testCallbacksOnFailureWhenPredicateThrowsAnException() { }; Multi.createFrom(). failure(new IOException("smallboom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure(boom).invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -300,7 +300,7 @@ public void testCallbacksOnCompletion() { AtomicBoolean cancellation = new AtomicBoolean(); Multi.createFrom(). empty() - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -334,7 +334,7 @@ public void testWithNoEvents() { AtomicBoolean cancellation = new AtomicBoolean(); AssertSubscriber subscriber = Multi.createFrom(). nothing() - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java index 21ea6ad20..5f0116f0e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryTest.java @@ -25,7 +25,7 @@ public void init() { numberOfSubscriptions = new AtomicInteger(); failing = Multi.createFrom() . emitter(emitter -> emitter.emit(1).emit(2).emit(3).fail(new IOException("boom"))) - .onSubscribe().invoke(s -> numberOfSubscriptions.incrementAndGet()); + .onSubscription().invoke(s -> numberOfSubscriptions.incrementAndGet()); } @Test diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java index d84a8425d..950702cc0 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java @@ -31,7 +31,7 @@ public void init() { numberOfSubscriptions = new AtomicInteger(); failingAfter2 = Multi.createFrom() . emitter(emitter -> emitter.emit(1).emit(2).fail(new IOException("boom"))) - .onSubscribe().invoke(s -> numberOfSubscriptions.incrementAndGet()); + .onSubscription().invoke(s -> numberOfSubscriptions.incrementAndGet()); failingAfter1 = Multi.createBy().concatenating() .streams(Multi.createFrom().item(1), Multi.createFrom().failure(new RuntimeException("boom"))); @@ -71,7 +71,7 @@ public void testWithOtherStreamFailing() { AtomicBoolean subscribed = new AtomicBoolean(); AtomicBoolean cancelled = new AtomicBoolean(); Multi multi = failingAfter1 - .onSubscribe().invoke(sub -> subscribed.set(true)) + .onSubscription().invoke(sub -> subscribed.set(true)) .onCancellation().invoke(() -> cancelled.set(true)); AssertSubscriber subscriber = multi @@ -92,7 +92,7 @@ public void testWhatTheWhenStreamFailsTheUpstreamIsCancelled() { AtomicBoolean subscribed = new AtomicBoolean(); AtomicBoolean cancelled = new AtomicBoolean(); Multi multi = failingAfter1 - .onSubscribe().invoke(sub -> subscribed.set(true)) + .onSubscription().invoke(sub -> subscribed.set(true)) .onCancellation().invoke(() -> cancelled.set(true)); AtomicInteger count = new AtomicInteger(); @@ -117,7 +117,7 @@ public void testCompletionWhenOtherStreamCompletes() { AtomicBoolean subscribed = new AtomicBoolean(); AtomicBoolean cancelled = new AtomicBoolean(); Multi source = failingAfter1 - .onSubscribe().invoke(sub -> subscribed.set(true)) + .onSubscription().invoke(sub -> subscribed.set(true)) .onCancellation().invoke(() -> cancelled.set(true)); Multi retry = source @@ -137,7 +137,7 @@ public void testCompletionWhenOtherStreamCompletes() { public void testAfterOnRetryAndCompletion() { AtomicBoolean sourceSubscribed = new AtomicBoolean(); Multi source = failingAfter1 - .onSubscribe().invoke(sub -> sourceSubscribed.set(true)); + .onSubscription().invoke(sub -> sourceSubscribed.set(true)); Multi retry = source .onFailure().retry().when(other -> other.select().first()); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureTest.java index 831f705a8..5472d531e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureTest.java @@ -320,7 +320,7 @@ public void testRecoverWithMultiUsingEmitterAsFallback() { AtomicInteger subscribed = new AtomicInteger(); Multi fallback = Multi.createFrom() . emitter(s -> s.emit(42).emit(43).complete()) - .onSubscribe().invoke(s -> subscribed.incrementAndGet()); + .onSubscription().invoke(s -> subscribed.incrementAndGet()); multi.onFailure() .recoverWithMulti(fallback) @@ -349,7 +349,7 @@ public void testRecoverWithItem2() { AtomicInteger subscribed = new AtomicInteger(); Multi fallback = Multi.createFrom() .item(0) - .onSubscribe().invoke(s -> subscribed.incrementAndGet()); + .onSubscription().invoke(s -> subscribed.incrementAndGet()); multi.onFailure() .recoverWithMulti(fallback) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnRequestTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnRequestTest.java index 4eac8b20e..732427ff9 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnRequestTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnRequestTest.java @@ -94,7 +94,7 @@ public void testCallWithSupplier() { Multi.createFrom().item(1) .onRequest().call(() -> Uni.createFrom().item("ok") - .onSubscribe().invoke(() -> called.set(true))) + .onSubscription().invoke(() -> called.set(true))) .subscribe().withSubscriber(subscriber); subscriber.request(10); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnSubscribeTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnSubscribeTest.java index 229dd5b0a..db4c22210 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnSubscribeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnSubscribeTest.java @@ -32,7 +32,7 @@ public void testInvoke() { AtomicInteger count = new AtomicInteger(); AtomicReference reference = new AtomicReference<>(); Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { reference.set(s); count.incrementAndGet(); }); @@ -59,7 +59,7 @@ public void testDeprecatedOnSubscribed() { AtomicInteger count = new AtomicInteger(); AtomicReference reference = new AtomicReference<>(); Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { reference.set(s); count.incrementAndGet(); }); @@ -87,11 +87,11 @@ public void testCall() { AtomicReference reference = new AtomicReference<>(); AtomicReference sub = new AtomicReference<>(); Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().call(s -> { + .onSubscription().call(s -> { reference.set(s); count.incrementAndGet(); return Uni.createFrom().nullItem() - .onSubscribe().invoke(sub::set); + .onSubscription().invoke(sub::set); }); AssertSubscriber subscriber = AssertSubscriber.create(10); @@ -116,10 +116,10 @@ public void testCallWithSupplier() { AtomicInteger count = new AtomicInteger(); AtomicReference sub = new AtomicReference<>(); Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().call(() -> { + .onSubscription().call(() -> { count.incrementAndGet(); return Uni.createFrom().nullItem() - .onSubscribe().invoke(sub::set); + .onSubscription().invoke(sub::set); }); AssertSubscriber subscriber = AssertSubscriber.create(10); @@ -139,7 +139,7 @@ public void testCallWithSupplier() { @Test public void testInvokeThrowingException() { Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { throw new IllegalStateException("boom"); }); @@ -153,7 +153,7 @@ public void testInvokeThrowingException() { @Test public void testCallThrowingException() { Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().call(s -> { + .onSubscription().call(s -> { throw new IllegalStateException("boom"); }); @@ -167,7 +167,7 @@ public void testCallThrowingException() { @Test public void testCallProvidingFailure() { Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().call(s -> Uni.createFrom().failure(new IOException("boom"))); + .onSubscription().call(s -> Uni.createFrom().failure(new IOException("boom"))); AssertSubscriber subscriber = AssertSubscriber.create(); @@ -179,7 +179,7 @@ public void testCallProvidingFailure() { @Test public void testCallReturningNullUni() { Multi multi = Multi.createFrom().items(1, 2, 3) - .onSubscribe().call(s -> null); + .onSubscription().call(s -> null); AssertSubscriber subscriber = AssertSubscriber.create(); @@ -190,13 +190,13 @@ public void testCallReturningNullUni() { @Test public void testThatInvokeConsumerCannotBeNull() { assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().items(1, 2, 3) - .onSubscribe().invoke((Consumer) null)); + .onSubscription().invoke((Consumer) null)); } @Test public void testThatCallFunctionCannotBeNull() { assertThrows(IllegalArgumentException.class, () -> Multi.createFrom().items(1, 2, 3) - .onSubscribe().call((Function>) null)); + .onSubscription().call((Function>) null)); } @Test @@ -215,7 +215,7 @@ public void testThatCallUpstreamCannotBeNull() { public void testThatSubscriptionIsNotPassedDownstreamUntilInvokeCallbackCompletes() { CountDownLatch latch = new CountDownLatch(1); AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { try { latch.await(); } catch (InterruptedException e) { @@ -238,7 +238,7 @@ public void testThatSubscriptionIsNotPassedDownstreamUntilInvokeCallbackComplete public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletes() { AtomicReference> emitter = new AtomicReference<>(); AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3) - .onSubscribe() + .onSubscription() .call(s -> Uni.createFrom().emitter((Consumer>) emitter::set)) .subscribe().withSubscriber(AssertSubscriber.create(3)); @@ -258,7 +258,7 @@ public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletes() public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletesWithDifferentThread() { AtomicReference> emitter = new AtomicReference<>(); AssertSubscriber subscriber = Multi.createFrom().items(1, 2, 3) - .onSubscribe() + .onSubscription() .call(s -> Uni.createFrom().emitter((Consumer>) emitter::set)) .runSubscriptionOn(Infrastructure.getDefaultExecutor()) .subscribe().withSubscriber(AssertSubscriber.create(3)); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnTerminationUniInvokeTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnTerminationUniInvokeTest.java index c89dfe001..adcd12296 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnTerminationUniInvokeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnTerminationUniInvokeTest.java @@ -38,7 +38,7 @@ public void testTerminationWhenErrorIsEmitted() { AtomicBoolean terminationCancelledFlag = new AtomicBoolean(); Multi.createFrom().failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -86,7 +86,7 @@ public void testTerminationWhenItemIsEmittedButUniInvokeIsFailed() { AtomicBoolean terminationCancelledFlag = new AtomicBoolean(); Multi.createFrom().item(1) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -133,7 +133,7 @@ public void testTerminationWhenItemIsEmittedButUniInvokeThrowsException() { AtomicBoolean terminationCancelledFlag = new AtomicBoolean(); Multi.createFrom().item(1) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -180,7 +180,7 @@ public void testTerminationWhenErrorIsEmittedButUniInvokeIsFailed() { AtomicBoolean terminationCancelledFlag = new AtomicBoolean(); Multi.createFrom().failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) @@ -233,7 +233,7 @@ public void testTerminationWhenErrorIsEmittedButUniInvokeThrowsException() { AtomicBoolean terminationCancelledFlag = new AtomicBoolean(); Multi.createFrom().failure(new IOException("boom")) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onItem().invoke(item::set) .onFailure().invoke(failure::set) .onCompletion().invoke(() -> completion.set(true)) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformToMultiTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformToMultiTest.java index 9566652f7..59e105ea3 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformToMultiTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiTransformToMultiTest.java @@ -749,7 +749,7 @@ public void testMaxConcurrency() { final AtomicInteger subscriptionTracker = new AtomicInteger(); Multi multi = Multi.createFrom().range(0, 100) .onItem().transformToMulti(i -> Multi.createFrom().items(i + 1, i + 2, i + 3) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { int n = subscriptionTracker.getAndIncrement(); if (n >= maxConcurrency) { Assertions.fail("Too many subscriptions: " + n); @@ -783,7 +783,7 @@ public void testMaxConcurrencyNormal() { final AtomicInteger subscriptionTracker = new AtomicInteger(); Multi multi = Multi.createFrom().items(1, 2, 3) .onItem().transformToMulti(i -> Multi.createFrom().items(4, 5, 6) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { int n = subscriptionTracker.getAndIncrement(); if (n >= maxConcurrency) { Assertions.fail("Too many subscriptions: " + n); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnEventTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnEventTest.java index 703587cb4..60ed1dfc7 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnEventTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnEventTest.java @@ -30,7 +30,7 @@ public void testActionsOnItem() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) .onItem().invoke(item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke((r, f, c) -> terminate.set(r)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -50,7 +50,7 @@ public void testActionsUsingOnAndThenGroup() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) .onItem().invoke(item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke((r, f, c) -> terminate.set(r)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -70,7 +70,7 @@ public void testActionsOnItem2() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) .onItem().invoke(item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke(() -> terminate.set(true)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -90,7 +90,7 @@ public void testActionsOnFailures() { UniAssertSubscriber subscriber = Uni.createFrom(). failure(new IOException("boom")) .onItem().invoke(item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke((r, f, c) -> terminate.set(f)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -110,7 +110,7 @@ public void testActionsOnFailures2() { UniAssertSubscriber subscriber = Uni.createFrom(). failure(new IOException("boom")) .onItem().invoke(item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke(() -> terminate.set(true)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -133,7 +133,7 @@ public void testWhenOnItemThrowsAnException() { throw new IllegalStateException("boom"); }) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke((r, f, c) -> { if (r != null) { ItemFromTerminate.set(r); @@ -161,7 +161,7 @@ public void testWhenOnItemThrowsAnException2() { throw new IllegalStateException("boom"); }) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke(() -> terminated.set(true)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -183,7 +183,7 @@ public void testWhenOnFailureThrowsAnException() { .onFailure().invoke(e -> { throw new IllegalStateException("boom"); }) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke((r, f, c) -> { if (r != null) { ItemFromTerminate.set(r); @@ -211,7 +211,7 @@ public void testWhenOnFailureThrowsAnException2() { .onFailure().invoke(e -> { throw new IllegalStateException("boom"); }) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().invoke(() -> terminated.set(true)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -226,7 +226,7 @@ public void testWhenOnFailureThrowsAnException2() { @Test public void testWhenOnSubscriptionThrowsAnException() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { throw new IllegalStateException("boom"); }).subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -490,7 +490,7 @@ public void testActionsOnTerminationCallWithResult() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) .onItem().invoke(Item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().call((r, f, c) -> { terminate.set(r); return Uni.createFrom().item(r * 100); @@ -513,7 +513,7 @@ public void testActionsOnTerminationWithFailure() { UniAssertSubscriber subscriber = Uni.createFrom(). failure(new IOException("boom")) .onItem().invoke(Item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().call((r, f, c) -> { terminate.set(f); return Uni.createFrom().failure(new IOException("tada")); @@ -538,7 +538,7 @@ public void testActionsOnTerminationCallWithSupplierWithResult() { UniAssertSubscriber subscriber = Uni.createFrom().item(1) .onItem().invoke(Item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().call(() -> { terminate.set(true); return Uni.createFrom().item(100); @@ -561,7 +561,7 @@ public void testActionsOnTerminationWithSupplierOnFailure() { UniAssertSubscriber subscriber = Uni.createFrom(). failure(new IOException("boom")) .onItem().invoke(Item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().call(() -> { terminate.set(true); return Uni.createFrom().failure(new IOException("tada")); @@ -586,7 +586,7 @@ public void testActionsOnTerminationWithMapperThrowingException() { UniAssertSubscriber subscriber = Uni.createFrom(). failure(new IOException("boom")) .onItem().invoke(Item::set) .onFailure().invoke(failure::set) - .onSubscribe().invoke(subscription::set) + .onSubscription().invoke(subscription::set) .onTermination().call((r, f, c) -> { terminate.set(f); throw new RuntimeException("tada"); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnSubscribeTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnSubscribeTest.java index 00c5e9f9a..d7e6fb2f9 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnSubscribeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniOnSubscribeTest.java @@ -28,7 +28,7 @@ public void testInvoke() { AtomicInteger count = new AtomicInteger(); AtomicReference reference = new AtomicReference<>(); Uni uni = Uni.createFrom().item(1) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { reference.set(s); count.incrementAndGet(); }); @@ -53,7 +53,7 @@ public void testInvoke() { public void testInvokeRunnable() { AtomicInteger count = new AtomicInteger(); Uni uni = Uni.createFrom().item(1) - .onSubscribe().invoke(count::incrementAndGet); + .onSubscription().invoke(count::incrementAndGet); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -72,7 +72,7 @@ public void testInvokeRunnable() { public void testOnSubscribed() { AtomicInteger count = new AtomicInteger(); AtomicReference reference = new AtomicReference<>(); - Uni uni = Uni.createFrom().item(1).onSubscribe().invoke(s -> { + Uni uni = Uni.createFrom().item(1).onSubscription().invoke(s -> { reference.set(s); count.incrementAndGet(); }); @@ -99,11 +99,11 @@ public void testCall() { AtomicReference reference = new AtomicReference<>(); AtomicReference sub = new AtomicReference<>(); Uni uni = Uni.createFrom().item(1) - .onSubscribe().call(s -> { + .onSubscription().call(s -> { reference.set(s); count.incrementAndGet(); return Uni.createFrom().nullItem() - .onSubscribe().invoke(sub::set); + .onSubscription().invoke(sub::set); }); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -131,10 +131,10 @@ public void testCallWithSupplier() { AtomicInteger count = new AtomicInteger(); AtomicReference sub = new AtomicReference<>(); Uni uni = Uni.createFrom().item(1) - .onSubscribe().call(() -> { + .onSubscription().call(() -> { count.incrementAndGet(); return Uni.createFrom().nullItem() - .onSubscribe().invoke(sub::set); + .onSubscription().invoke(sub::set); }); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -160,7 +160,7 @@ public void testDelayedCallAfterFailure() { AtomicReference reference = new AtomicReference<>(); AtomicReference sub = new AtomicReference<>(); Uni uni = Uni.createFrom().failure(new IOException("boom")) - .onSubscribe().call(s -> { + .onSubscription().call(s -> { reference.set(s); count.incrementAndGet(); return Uni.createFrom().emitter(e -> { @@ -169,7 +169,7 @@ public void testDelayedCallAfterFailure() { e.complete("yo"); }).start(); }) - .onSubscribe().invoke(sub::set); + .onSubscription().invoke(sub::set); }); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -195,7 +195,7 @@ public void testDelayedCallAfterFailure() { @Test public void testInvokeThrowingException() { Uni uni = Uni.createFrom().item(1) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { throw new IllegalStateException("boom"); }); @@ -209,7 +209,7 @@ public void testInvokeThrowingException() { @Test public void testCallThrowingException() { Uni uni = Uni.createFrom().item(1) - .onSubscribe().call(s -> { + .onSubscription().call(s -> { throw new IllegalStateException("boom"); }); @@ -223,7 +223,7 @@ public void testCallThrowingException() { @Test public void testCallProvidingFailure() { Uni uni = Uni.createFrom().item(1) - .onSubscribe().call(s -> Uni.createFrom().failure(new IOException("boom"))); + .onSubscription().call(s -> Uni.createFrom().failure(new IOException("boom"))); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -235,7 +235,7 @@ public void testCallProvidingFailure() { @Test public void testCallReturningNullUni() { Uni uni = Uni.createFrom().item(1) - .onSubscribe().call(s -> null); + .onSubscription().call(s -> null); UniAssertSubscriber subscriber = UniAssertSubscriber.create(); @@ -247,13 +247,13 @@ public void testCallReturningNullUni() { @Test public void testThatInvokeConsumerCannotBeNull() { assertThrows(IllegalArgumentException.class, () -> Uni.createFrom().item(1) - .onSubscribe().invoke((Consumer) null)); + .onSubscription().invoke((Consumer) null)); } @Test public void testThatCallFunctionCannotBeNull() { assertThrows(IllegalArgumentException.class, () -> Uni.createFrom().item(1) - .onSubscribe().call((Function>) null)); + .onSubscription().call((Function>) null)); } @Test @@ -267,7 +267,7 @@ public void testThatCallUpstreamCannotBeNull() { public void testThatSubscriptionIsNotPassedDownstreamUntilInvokeCallbackCompletes() { CountDownLatch latch = new CountDownLatch(1); UniAssertSubscriber subscriber = Uni.createFrom().item(1) - .onSubscribe().invoke(s -> { + .onSubscription().invoke(s -> { try { latch.await(); } catch (InterruptedException e) { @@ -289,7 +289,7 @@ public void testThatSubscriptionIsNotPassedDownstreamUntilInvokeCallbackComplete public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletes() { AtomicReference> emitter = new AtomicReference<>(); UniAssertSubscriber subscriber = Uni.createFrom().item(() -> 1) - .onSubscribe() + .onSubscription() .call(s -> Uni.createFrom().emitter((Consumer>) emitter::set)) .subscribe().withSubscriber(UniAssertSubscriber.create()); @@ -308,7 +308,7 @@ public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletes() public void testThatSubscriptionIsNotPassedDownstreamUntilProducedUniCompletesWithDifferentThread() { AtomicReference> emitter = new AtomicReference<>(); UniAssertSubscriber subscriber = Uni.createFrom().item(() -> 1) - .onSubscribe() + .onSubscription() .call(s -> Uni.createFrom().emitter((Consumer>) emitter::set)) .runSubscriptionOn(Infrastructure.getDefaultExecutor()) .subscribe().withSubscriber(UniAssertSubscriber.create()); diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java index c14617fad..93e3b7a72 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java @@ -382,36 +382,36 @@ public void testWithArraysWithNoResultAndCancellation() { // We need 10 unis to avoid being handled as tuples Uni uni1 = Uni.createFrom().item(1) - .onSubscribe().invoke(s -> subscriptions[0].set(true)) + .onSubscription().invoke(s -> subscriptions[0].set(true)) .onCancellation().invoke(() -> cancellations[0].set(true)); Uni uni2 = Uni.createFrom().item(2) - .onSubscribe().invoke(s -> subscriptions[1].set(true)) + .onSubscription().invoke(s -> subscriptions[1].set(true)) .onCancellation().invoke(() -> cancellations[1].set(true)); Uni uni3 = Uni.createFrom().item(3) - .onSubscribe().invoke(s -> subscriptions[2].set(true)) + .onSubscription().invoke(s -> subscriptions[2].set(true)) .onCancellation().invoke(() -> cancellations[2].set(true)); Uni uni4 = Uni.createFrom().item(4) - .onSubscribe().invoke(s -> subscriptions[3].set(true)) + .onSubscription().invoke(s -> subscriptions[3].set(true)) .onCancellation().invoke(() -> cancellations[3].set(true)); Uni uni5 = Uni.createFrom().item(5) - .onSubscribe().invoke(s -> subscriptions[4].set(true)) + .onSubscription().invoke(s -> subscriptions[4].set(true)) .onCancellation().invoke(() -> cancellations[4].set(true)); Uni uni6 = Uni.createFrom().item(6) - .onSubscribe().invoke(s -> subscriptions[5].set(true)) + .onSubscription().invoke(s -> subscriptions[5].set(true)) .onCancellation().invoke(() -> cancellations[5].set(true)); Uni uni7 = Uni.createFrom(). emitter(e -> { // Do not emit }) - .onSubscribe().invoke(s -> subscriptions[6].set(true)) + .onSubscription().invoke(s -> subscriptions[6].set(true)) .onCancellation().invoke(() -> cancellations[6].set(true)); Uni uni8 = Uni.createFrom().item(() -> 8) - .onSubscribe().invoke(s -> subscriptions[7].set(true)) + .onSubscription().invoke(s -> subscriptions[7].set(true)) .onCancellation().invoke(() -> cancellations[7].set(true)); Uni uni9 = Uni.createFrom().item(() -> 9) - .onSubscribe().invoke(s -> subscriptions[8].set(true)) + .onSubscription().invoke(s -> subscriptions[8].set(true)) .onCancellation().invoke(() -> cancellations[8].set(true)); Uni uni10 = Uni.createFrom().item(() -> 10) - .onSubscribe().invoke(s -> subscriptions[9].set(true)) + .onSubscription().invoke(s -> subscriptions[9].set(true)) .onCancellation().invoke(() -> cancellations[9].set(true)); Uni all = Uni.combine().all().unis(uni1, uni2, uni3, uni4, uni5, uni6, uni7, uni8, uni9, uni10) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java index b25535b18..60001bbd9 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceFromUniTest.java @@ -624,7 +624,7 @@ public void testOnCompletionWithSingleFinalizer() { .resourceFromUni(() -> Uni.createFrom().item(1), x -> Multi.createFrom().range(x, 11)) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }); multi.subscribe().withSubscriber(AssertSubscriber.create(20)) @@ -641,7 +641,7 @@ public void testOnFailureWithSingleFinalizer() { x -> Multi.createFrom().range(x, 11).onCompletion().failWith(new IOException("boom"))) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }); multi.subscribe().withSubscriber(AssertSubscriber.create(20)) @@ -658,7 +658,7 @@ public void testOnCancellationWithSingleFinalizer() { x -> Multi.createFrom().ticks().every(Duration.ofMillis(10))) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }) .select().first(5); @@ -745,25 +745,25 @@ static Uni create() { public Multi data() { return Multi.createFrom().item("in transaction") - .onSubscribe().invoke(s -> subscribed.set(true)); + .onSubscription().invoke(s -> subscribed.set(true)); } public Multi infinite() { return Multi.createFrom().ticks().every(Duration.ofMillis(10)) .onItem().transform(l -> Long.toString(l)) - .onSubscribe().invoke(s -> subscribed.set(true)); + .onSubscription().invoke(s -> subscribed.set(true)); } public Uni commit() { return Uni.createFrom().voidItem() - .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + .onSubscription().invoke(s -> onCompleteSubscribed.set(true)); } public Uni commitFailure() { return Uni.createFrom().voidItem() .onItem().delayIt().by(DELAY) .onItem().failWith(x -> new IOException("commit failed")) - .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + .onSubscription().invoke(s -> onCompleteSubscribed.set(true)); } public Uni commitReturningNull() { @@ -773,14 +773,14 @@ public Uni commitReturningNull() { public Uni rollback(Throwable failure) { return Uni.createFrom().voidItem() .onItem().invoke(x -> this.failure.set(failure)) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackDelay(Throwable failure) { return Uni.createFrom().voidItem() .onItem().invoke(x -> this.failure.set(failure)) .onItem().delayIt().by(DELAY) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackFailure(Throwable failure) { @@ -788,7 +788,7 @@ public Uni rollbackFailure(Throwable failure) { .onItem().invoke(x -> this.failure.set(failure)) .onItem().delayIt().by(DELAY) .onItem().failWith(x -> new IOException("rollback failed")) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackReturningNull(Throwable f) { @@ -798,7 +798,7 @@ public Uni rollbackReturningNull(Throwable f) { public Uni cancel() { return Uni.createFrom().voidItem() - .onSubscribe().invoke(s -> onCancelSubscribed.set(true)); + .onSubscription().invoke(s -> onCancelSubscribed.set(true)); } } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceTest.java index f0ce4e3c6..180a90966 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/builders/MultiFromResourceTest.java @@ -613,7 +613,7 @@ public void testOnCompletionWithSingleFinalizer() { .resource(() -> 1, x -> Multi.createFrom().range(x, 11)) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }); multi.subscribe().withSubscriber(AssertSubscriber.create(20)) @@ -630,7 +630,7 @@ public void testOnFailureWithSingleFinalizer() { x -> Multi.createFrom().range(x, 11).onCompletion().failWith(new IOException("boom"))) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }); multi.subscribe().withSubscriber(AssertSubscriber.create(20)) @@ -646,7 +646,7 @@ public void testOnCancellationWithSingleFinalizer() { .resource(() -> 1, x -> Multi.createFrom().ticks().every(Duration.ofMillis(10))) .withFinalizer(r -> { return Uni.createFrom().item("ok") - .onSubscribe().invoke(s -> subscribed.set(true)) + .onSubscription().invoke(s -> subscribed.set(true)) .onItem().ignore().andContinueWithNull(); }) .select().first(5); @@ -708,25 +708,25 @@ static class FakeTransactionalResource { public Multi data() { return Multi.createFrom().item("in transaction") - .onSubscribe().invoke(s -> subscribed.set(true)); + .onSubscription().invoke(s -> subscribed.set(true)); } public Multi infinite() { return Multi.createFrom().ticks().every(Duration.ofMillis(10)) .onItem().transform(l -> Long.toString(l)) - .onSubscribe().invoke(s -> subscribed.set(true)); + .onSubscription().invoke(s -> subscribed.set(true)); } public Uni commit() { return Uni.createFrom().voidItem() - .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + .onSubscription().invoke(s -> onCompleteSubscribed.set(true)); } public Uni commitFailure() { return Uni.createFrom().voidItem() .onItem().delayIt().by(DELAY) .onItem().failWith(x -> new IOException("commit failed")) - .onSubscribe().invoke(s -> onCompleteSubscribed.set(true)); + .onSubscription().invoke(s -> onCompleteSubscribed.set(true)); } public Uni commitReturningNull() { @@ -736,14 +736,14 @@ public Uni commitReturningNull() { public Uni rollback(Throwable failure) { return Uni.createFrom().voidItem() .onItem().invoke(x -> this.failure.set(failure)) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackDelay(Throwable failure) { return Uni.createFrom().voidItem() .onItem().invoke(x -> this.failure.set(failure)) .onItem().delayIt().by(DELAY) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackFailure(Throwable failure) { @@ -751,7 +751,7 @@ public Uni rollbackFailure(Throwable failure) { .onItem().invoke(x -> this.failure.set(failure)) .onItem().delayIt().by(DELAY) .onItem().failWith(x -> new IOException("rollback failed")) - .onSubscribe().invoke(s -> onFailureSubscribed.set(true)); + .onSubscription().invoke(s -> onFailureSubscribed.set(true)); } public Uni rollbackReturningNull(Throwable f) { @@ -761,7 +761,7 @@ public Uni rollbackReturningNull(Throwable f) { public Uni cancel() { return Uni.createFrom().voidItem() - .onSubscribe().invoke(s -> onCancelSubscribed.set(true)); + .onSubscription().invoke(s -> onCancelSubscribed.set(true)); } } } diff --git a/implementation/src/test/java/tck/MultiOnSubscribeCallTckTest.java b/implementation/src/test/java/tck/MultiOnSubscriptionCallTckTest.java similarity index 57% rename from implementation/src/test/java/tck/MultiOnSubscribeCallTckTest.java rename to implementation/src/test/java/tck/MultiOnSubscriptionCallTckTest.java index 57cbb58b9..3e0832045 100644 --- a/implementation/src/test/java/tck/MultiOnSubscribeCallTckTest.java +++ b/implementation/src/test/java/tck/MultiOnSubscriptionCallTckTest.java @@ -4,17 +4,17 @@ import io.smallrye.mutiny.Uni; -public class MultiOnSubscribeCallTckTest extends AbstractPublisherTck { +public class MultiOnSubscriptionCallTckTest extends AbstractPublisherTck { @Override public Publisher createPublisher(long elements) { return upstream(elements) - .onSubscribe().call(x -> Uni.createFrom().nullItem()); + .onSubscription().call(x -> Uni.createFrom().nullItem()); } @Override public Publisher createFailedPublisher() { return failedUpstream() - .onSubscribe().call(x -> Uni.createFrom().nullItem()); + .onSubscription().call(x -> Uni.createFrom().nullItem()); } } diff --git a/implementation/src/test/java/tck/MultiOnSubscribeInvokeTckTest.java b/implementation/src/test/java/tck/MultiOnSubscriptionInvokeTckTest.java similarity index 67% rename from implementation/src/test/java/tck/MultiOnSubscribeInvokeTckTest.java rename to implementation/src/test/java/tck/MultiOnSubscriptionInvokeTckTest.java index 174f67d8f..599bd8c25 100644 --- a/implementation/src/test/java/tck/MultiOnSubscribeInvokeTckTest.java +++ b/implementation/src/test/java/tck/MultiOnSubscriptionInvokeTckTest.java @@ -2,12 +2,12 @@ import org.reactivestreams.Publisher; -public class MultiOnSubscribeInvokeTckTest extends AbstractPublisherTck { +public class MultiOnSubscriptionInvokeTckTest extends AbstractPublisherTck { @Override public Publisher createPublisher(long elements) { return upstream(elements) - .onSubscribe().invoke(x -> { + .onSubscription().invoke(x -> { // noop }); } @@ -15,7 +15,7 @@ public Publisher createPublisher(long elements) { @Override public Publisher createFailedPublisher() { return failedUpstream() - .onSubscribe().invoke(x -> { + .onSubscription().invoke(x -> { // noop }); } From 93034257994db809191a028978e39965230bff9b Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 17 May 2021 12:12:55 +0200 Subject: [PATCH 2/4] Update implementation/src/main/java/io/smallrye/mutiny/Uni.java Co-authored-by: Clement Escoffier --- implementation/src/main/java/io/smallrye/mutiny/Uni.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/Uni.java b/implementation/src/main/java/io/smallrye/mutiny/Uni.java index 1ad15012f..1e0fdc9b7 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Uni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Uni.java @@ -201,7 +201,7 @@ default CompletableFuture subscribeAsCompletionStage() { /** * Configures the action to execute when the observed {@link Uni} sends a {@link UniSubscription}. - * The downstream don't have a subscription yet. It will be passed once the configured action completes. + * The downstream does not have a subscription yet. It will be passed once the configured action completes. * *

* Example: From fbd941993100f99e8b9d4602ec679ce7292e20eb Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 17 May 2021 12:13:01 +0200 Subject: [PATCH 3/4] Update implementation/src/main/java/io/smallrye/mutiny/Multi.java Co-authored-by: Clement Escoffier --- implementation/src/main/java/io/smallrye/mutiny/Multi.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/Multi.java b/implementation/src/main/java/io/smallrye/mutiny/Multi.java index 65c62822c..bbb85c92d 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/Multi.java +++ b/implementation/src/main/java/io/smallrye/mutiny/Multi.java @@ -134,7 +134,7 @@ default O stage(Function, O> stage) { /** * Configures the action to execute when the observed {@link Multi} sends a {@link Subscription}. - * The downstream don't have a subscription yet. It will be passed once the configured action completes. + * The downstream does not have a subscription yet. It will be passed once the configured action completes. *

* For example: * From e0a7d82d83476cd4b6fc81350101fa9663a4d549 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 17 May 2021 12:15:18 +0200 Subject: [PATCH 4/4] Avoid duplication --- .../main/java/io/smallrye/mutiny/operators/AbstractMulti.java | 2 +- .../src/main/java/io/smallrye/mutiny/operators/AbstractUni.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java index b404e4dbd..e76ef5b57 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractMulti.java @@ -112,7 +112,7 @@ public MultiOverflow onOverflow() { @Override public MultiOnSubscribe onSubscribe() { - return new MultiOnSubscribe<>(this); + return onSubscription(); } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java index 2a8f52dd5..e9e1cf51f 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/AbstractUni.java @@ -71,7 +71,7 @@ public UniOnFailure onFailure(Class typeOfFailure) { @Override public UniOnSubscribe onSubscribe() { - return new UniOnSubscribe<>(this); + return onSubscription(); } @Override