Skip to content

Commit

Permalink
Merge pull request #563 from smallrye/rename-onSubscribe
Browse files Browse the repository at this point in the history
Deprecate onSubscribe() in favor or onSubscription()
  • Loading branch information
cescoffier authored May 17, 2021
2 parents 5378833 + e0a7d82 commit d25fbea
Show file tree
Hide file tree
Showing 34 changed files with 202 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testOnSubscribe() {

AssertSubscriber<Long> subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(1))
.select().first(5)
.onSubscribe().invoke(() -> {
.onSubscription().invoke(() -> {
assertThat(ctx).isSameAs(MyContext.get());
MyContext.get().set("test");
})
Expand Down
4 changes: 2 additions & 2 deletions documentation/src/main/jekyll/guides/logging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Here the `log` operator traces all events between the `onItem().transform(...)`

[source,text]
----
11:01:48.709 [main] INFO Multi.MultiMapOp.0 - onSubscribe()
11:01:48.709 [main] INFO Multi.MultiMapOp.0 - onSubscription()
11:01:48.711 [main] INFO Multi.MultiMapOp.0 - request(9223372036854775807)
11:01:48.711 [main] INFO Multi.MultiMapOp.0 - onItem(10)
>>> 10
Expand All @@ -46,7 +46,7 @@ Events are written by default to the standard console output in a format similar

[source,text]
----
[--> Multi.MultiMapOp.0 | onSubscribe()
[--> Multi.MultiMapOp.0 | onSubscription()
[--> Multi.MultiMapOp.0 | request(9223372036854775807)
[--> Multi.MultiMapOp.0 | onItem(10)
[--> Multi.MultiMapOp.0 | onItem(20)
Expand Down
2 changes: 1 addition & 1 deletion documentation/src/test/java/guides/EventsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void test(SystemOut out) {
.invoke(failure -> log("Failed with " + failure))
.onCompletion() // Called when the stream completes
.invoke(() -> log("Completed"))
.onSubscribe() // Called when the upstream is ready
.onSubscription() // Called when the upstream is ready
.invoke(subscription -> log("We are subscribed!"))
.onCancellation() // Called when the downstream cancels
.invoke(() -> log("Cancelled :-("))
Expand Down
2 changes: 1 addition & 1 deletion documentation/src/test/java/guides/ObserveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void all() {
Multi<String> multi = Multi.createFrom().items("a", "b", "c");
// tag::invoke-all[]
multi
.onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed"))
.onSubscription().invoke(() -> System.out.println("⬇️ Subscribed"))
.onItem().invoke(i -> System.out.println("⬇️ Received item: " + i))
.onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f))
.onCompletion().invoke(() -> System.out.println("⬇️ Completed"))
Expand Down
15 changes: 14 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,20 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.addedToInterface",
"new": "method io.smallrye.mutiny.groups.MultiOnSubscribe<T> io.smallrye.mutiny.Multi<T>::onSubscription()",
"justification": "Deprecated API"
},
{
"ignore": true,
"code": "java.method.addedToInterface",
"new": "method io.smallrye.mutiny.groups.UniOnSubscribe<T> io.smallrye.mutiny.Uni<T>::onSubscription()",
"justification": "Deprecated API"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
20 changes: 20 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,29 @@ default <O> O stage(Function<Multi<T>, O> stage) {
* </pre>
*
* @return the object to configure the action to execution on subscription.
* @deprecated use {@link #onSubscription()} instead
*/
@Deprecated
MultiOnSubscribe<T> onSubscribe();

/**
* Configures the action to execute when the observed {@link Multi} sends a {@link Subscription}.
* The downstream does not have a subscription yet. It will be passed once the configured action completes.
* <p>
* For example:
*
* <pre>
* {@code
* multi.onSubscription().invoke(sub -> System.out.println("subscribed"));
* // Delay the subscription by 1 second (or until an asynchronous action completes)
* multi.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
* }
* </pre>
*
* @return the object to configure the action to execution on subscription.
*/
MultiOnSubscribe<T> onSubscription();

/**
* Configures a type of failure filtering the failures on which the behavior (specified with the returned
* {@link MultiOnFailure}) is applied.
Expand Down
22 changes: 22 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Uni.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,31 @@ default CompletableFuture<T> subscribeAsCompletionStage() {
* </pre>
*
* @return the object to configure the action to execution on subscription.
* @deprecated use {@link #onSubscription()} instead
*/
@Deprecated
UniOnSubscribe<T> onSubscribe();

/**
* Configures the action to execute when the observed {@link Uni} sends a {@link UniSubscription}.
* The downstream does not have a subscription yet. It will be passed once the configured action completes.
*
* <p>
* Example:
* </p>
*
* <pre>
* {@code
* uni.onSubscription().invoke(sub -> System.out.println("subscribed"));
* // Delay the subscription by 1 second (or until an asynchronous action completes)
* uni.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
* }
* </pre>
*
* @return the object to configure the action to execution on subscription.
*/
UniOnSubscribe<T> onSubscription();

/**
* Configures the action to execute when the observed {@link Uni} emits either an item (potentially {@code null}))
* or a failure. Unlike {@link #onItem()} and {@link #onFailure()} the action would handle both cases in on "go".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
*
* <pre>
* {@code
* multi.onSubscribe().invoke(sub -> System.out.println("subscribed"));
* multi.onSubscription().invoke(sub -> System.out.println("subscribed"));
* // Delay the subscription by 1 second (or until an asynchronous action completes)
* multi.onSubscribe().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
* multi.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
* }
* </pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
*
* <pre>
* {@code
* uni.onSubscribe().invoke(sub -> System.out.println("subscribed"));
* uni.onSubscription().invoke(sub -> System.out.println("subscribed"));
* // Delay the subscription by 1 second (or until an asynchronous action completes)
* uni.onSubscribe().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
* uni.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
*}
* </pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void reset() {

@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
upstream.onSubscribe().invoke(subscription -> {
upstream.onSubscription().invoke(subscription -> {
incrementInvocationCount();
lastSubscription = subscription;
}).subscribe().withSubscriber(downstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface Spy {
// --------------------------------------------------------------------- //

/**
* Spy {@link Uni#onSubscribe()} events.
* Spy {@link Uni#onSubscription()} events.
*
* @param upstream the upstream
* @param <T> the item type
Expand Down Expand Up @@ -219,7 +219,7 @@ static <T> MultiOnRequestSpy<T> onRequest(Multi<T> upstream) {
}

/**
* Spy {@link Multi#onSubscribe()} events.
* Spy {@link Multi#onSubscription()} events.
*
* @param upstream the upstream
* @param <T> the items type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public void reset() {
@Override
public void subscribe(UniSubscriber<? super T> downstream) {
upstream()
.onSubscribe().invoke(uniSubscription -> {
.onSubscription().invoke(uniSubscription -> {
incrementInvocationCount();
lastSubscription = uniSubscription;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public MultiOverflow<T> onOverflow() {

@Override
public MultiOnSubscribe<T> onSubscribe() {
return onSubscription();
}

@Override
public MultiOnSubscribe<T> onSubscription() {
return new MultiOnSubscribe<>(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public UniOnFailure<T> onFailure(Class<? extends Throwable> typeOfFailure) {

@Override
public UniOnSubscribe<T> onSubscribe() {
return onSubscription();
}

@Override
public UniOnSubscribe<T> onSubscription() {
return new UniOnSubscribe<>(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onFailure(Throwable throwable) {
public void cancel() {
if (state.compareAndSet(HAS_SUBSCRIPTION, DONE)) {
while (subscription == null) {
// We are in the middle of a race condition with onSubscribe()
// We are in the middle of a race condition with onSubscription()
}
if (subscription != null) { // May have been cancelled already by another thread.
subscription.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void testWithMultis() {
Multi.createFrom().items("d", "e"),
Multi.createFrom().empty(),
Multi.createFrom().items("f", "g")
.onSubscribe().invoke(() -> subscribed.set(true)))
.onSubscription().invoke(() -> subscribed.set(true)))
.onItem().<String> disjoint()
.subscribe().withSubscriber(AssertSubscriber.create(4));
assertThat(subscribed).isFalse();
Expand Down Expand Up @@ -95,7 +95,7 @@ public void testWithMultisWithAFailure() {
Multi.createFrom().items("d", "e"),
Multi.createFrom().failure(new IOException("boom")),
Multi.createFrom().items("f", "g")
.onSubscribe().invoke(() -> subscribed.set(true)))
.onSubscription().invoke(() -> subscribed.set(true)))
.onItem().<String> disjoint()
.subscribe().withSubscriber(AssertSubscriber.create(4));
assertThat(subscribed).isFalse();
Expand All @@ -116,7 +116,7 @@ public void testWithMultisWithOneEmittingAFailure() {
e.fail(new IOException("boom"));
}),
Multi.createFrom().items("g")
.onSubscribe().invoke(() -> subscribed.set(true)))
.onSubscription().invoke(() -> subscribed.set(true)))
.onItem().<String> disjoint()
.subscribe().withSubscriber(AssertSubscriber.create(4));
assertThat(subscribed).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class SpyTest {
@Nested
class SpyUni {
@Test
@DisplayName("Spy onSubscribe()")
@DisplayName("Spy onSubscription()")
void spyOnSubscribeSpy() {
UniOnSubscribeSpy<Integer> spy = Spy.onSubscribe(Uni.createFrom().item(69));
UniAssertSubscriber<Integer> subscriber = spy.subscribe().withSubscriber(UniAssertSubscriber.create());
Expand Down Expand Up @@ -489,7 +489,7 @@ void spyOnRequest() {
}

@Test
@DisplayName("Spy onSubscribe()")
@DisplayName("Spy onSubscription()")
void spyOnSubscribe() {
MultiOnSubscribeSpy<Integer> spy = Spy.onSubscribe(Multi.createFrom().items(1, 2, 3));
AssertSubscriber<Integer> subscriber = spy.subscribe().withSubscriber(AssertSubscriber.create(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,18 +323,18 @@ public void testAwaitSubscription() {

// Delay
subscriber = Multi.createFrom().items(1)
.onSubscribe().call(x -> smallDelay)
.onSubscription().call(x -> smallDelay)
.subscribe().withSubscriber(AssertSubscriber.create(0));
assertThat(subscriber.awaitSubscription()).isSameAs(subscriber);

subscriber = Multi.createFrom().items(1)
.onSubscribe().call(x -> smallDelay)
.onSubscription().call(x -> smallDelay)
.subscribe().withSubscriber(AssertSubscriber.create(0));
assertThat(subscriber.awaitSubscription(MEDIUM)).isSameAs(subscriber);

// timeout
subscriber = Multi.createFrom().items(1)
.onSubscribe().call(x -> mediumDelay)
.onSubscription().call(x -> mediumDelay)
.subscribe().withSubscriber(AssertSubscriber.create(0));
AssertSubscriber<Integer> tmp = subscriber;
assertThatThrownBy(() -> tmp.awaitSubscription(SMALL)).isInstanceOf(AssertionError.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,18 +244,18 @@ public void testAwaitSubscription() {

// Delay
subscriber = Uni.createFrom().item(1)
.onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1)))
.onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1)))
.subscribe().withSubscriber(UniAssertSubscriber.create());
assertThat(subscriber.awaitSubscription()).isSameAs(subscriber);

subscriber = Uni.createFrom().item(1)
.onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1)))
.onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(1)))
.subscribe().withSubscriber(UniAssertSubscriber.create());
assertThat(subscriber.awaitSubscription(Duration.ofSeconds(5))).isSameAs(subscriber);

// timeout
subscriber = Uni.createFrom().item(1)
.onSubscribe().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(10)))
.onSubscription().call(x -> Uni.createFrom().item(x).onItem().delayIt().by(Duration.ofSeconds(10)))
.subscribe().withSubscriber(UniAssertSubscriber.create());
UniAssertSubscriber<Integer> tmp = subscriber;
assertThatThrownBy(() -> tmp.awaitSubscription(Duration.ofMillis(100))).isInstanceOf(AssertionError.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testCallbacksWhenItemIsEmitted() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().item(1)
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onItem().invoke(() -> invokedOnItemRunnable.set(true))
Expand Down Expand Up @@ -96,7 +96,7 @@ public void testCallbacksWhenItemIsEmittedUsingOnAndThenGroup() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().item(1)
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onTermination().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testCallbacksWhenItemIsEmittedWithDeprecatedApis() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().item(1)
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -172,7 +172,7 @@ public void testCallbacksOnFailure() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().<Integer> failure(new IOException("boom"))
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testCallbacksOnFailureWhenPredicateMatches() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().<Integer> failure(new IOException("boom"))
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure(IOException.class).invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -234,7 +234,7 @@ public void testCallbacksOnFailureWhenPredicateDoesNotPass() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().<Integer> failure(new IOException("boom"))
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure(f -> f.getMessage().contains("missing")).invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testCallbacksOnFailureWhenPredicateThrowsAnException() {
};

Multi.createFrom().<Integer> failure(new IOException("smallboom"))
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure(boom).invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testCallbacksOnCompletion() {
AtomicBoolean cancellation = new AtomicBoolean();

Multi.createFrom().<Integer> empty()
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down Expand Up @@ -334,7 +334,7 @@ public void testWithNoEvents() {
AtomicBoolean cancellation = new AtomicBoolean();

AssertSubscriber<Integer> subscriber = Multi.createFrom().<Integer> nothing()
.onSubscribe().invoke(subscription::set)
.onSubscription().invoke(subscription::set)
.onItem().invoke(item::set)
.onFailure().invoke(failure::set)
.onCompletion().invoke(() -> completion.set(true))
Expand Down
Loading

0 comments on commit d25fbea

Please sign in to comment.