Skip to content

Commit

Permalink
Merge pull request #379 from smallrye/internal/uni-refactor-to-UniSub…
Browse files Browse the repository at this point in the history
…scriber
  • Loading branch information
cescoffier authored Dec 8, 2020
2 parents 2615d01 + cd2831e commit 832e918
Show file tree
Hide file tree
Showing 47 changed files with 189 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.UniInterceptor;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

Expand Down Expand Up @@ -44,7 +43,7 @@ public <T> Uni<T> onUniCreation(Uni<T> uni) {
Executor executor = getThreadContext().currentContextExecutor();
return new AbstractUni<T>() {
@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
executor.execute(() -> AbstractUni.subscribe(uni, subscriber));
}
};
Expand Down
49 changes: 49 additions & 0 deletions implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"io\\.smallrye\\.mutiny\\..*"
],
"exclude": [
"io\\.smallrye\\.mutiny\\.operators\\.*",
"io\\.smallrye\\.mutiny\\.operators\\..*"
]
}
Expand All @@ -32,6 +33,54 @@
"old": "method <T> io.smallrye.mutiny.helpers.Subscriptions.SingleItemSubscription<T> io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber<T>, T)",
"new": "method <T> org.reactivestreams.Subscription io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber<T>, T)",
"justification": "SingleItemSubscription should not be exposed, and should be used as a regular Subscription"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.operators.AbstractUni<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.operators.AbstractUni<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniGlobalSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -68,7 +68,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
onTerminationSpy.subscribe().withSubscriber(downstream);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnCancellationSpy<T> extends UniSpyBase<T> {

Expand All @@ -10,7 +10,7 @@ public class UniOnCancellationSpy<T> extends UniSpyBase<T> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onCancellation().invoke(this::incrementInvocationCount)
.subscribe().withSubscriber(downstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -38,7 +38,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
UniOnFailure<? extends T> group;
if (predicate != null) {
group = upstream().onFailure(predicate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemOrFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -32,7 +32,7 @@ public Throwable lastFailure() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItemOrFailure().invoke((item, failure) -> {
synchronized (UniOnItemOrFailureSpy.this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemSpy<T> extends UniSpyBase<T> {

Expand All @@ -22,7 +22,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItem().invoke(item -> {
this.lastItem = item;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniOnSubscribeSpy<T> extends UniSpyBase<T> {
Expand All @@ -23,7 +23,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onSubscribe().invoke(uniSubscription -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.tuples.Tuple3;

public class UniOnTerminationSpy<T> extends UniSpyBase<T> {
Expand Down Expand Up @@ -31,7 +31,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onTermination().invoke((i, f, c) -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public abstract class AbstractUni<T> implements Uni<T> {

protected abstract void subscribing(UniSerializedSubscriber<? super T> subscriber);
protected abstract void subscribing(UniSubscriber<? super T> subscriber);

/**
* Encapsulates subscription to slightly optimized the AbstractUni case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public UniAndCombination(Uni<? extends I> upstream, List<? extends Uni<?>> other
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AndSupervisor andSupervisor = new AndSupervisor(subscriber);
subscriber.onSubscribe(andSupervisor);
// Must wait until the subscriber get a subscription before subscribing to the sources.
Expand All @@ -51,11 +51,11 @@ protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
private class AndSupervisor implements UniSubscription {

private final List<UniHandler> handlers = new ArrayList<>();
private final UniSerializedSubscriber<? super O> subscriber;
private final UniSubscriber<? super O> subscriber;

AtomicBoolean cancelled = new AtomicBoolean();

AndSupervisor(UniSerializedSubscriber<? super O> sub) {
AndSupervisor(UniSubscriber<? super O> sub) {
subscriber = sub;

for (Uni u : unis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Supplier;

import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromCompletionStage<O> extends UniOperator<Void, O> {
private final Supplier<? extends CompletionStage<? extends O>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromCompletionStage(Supplier<? extends CompletionStage<? extends
}

private static <O> void forwardFromCompletionStage(CompletionStage<? extends O> stage,
UniSerializedSubscriber<? super O> subscriber) {
UniSubscriber<? super O> subscriber) {
subscriber.onSubscribe(() -> stage.toCompletableFuture().cancel(false));
stage.whenComplete((res, fail) -> {
if (fail != null) {
Expand All @@ -33,7 +34,7 @@ private static <O> void forwardFromCompletionStage(CompletionStage<? extends O>
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
CompletionStage<? extends O> stage;
try {
stage = supplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromDeferredSupplier<T> extends UniOperator<Void, T> {
private final Supplier<Uni<? extends T>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromDeferredSupplier(Supplier<Uni<? extends T>> supplier) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
nonNull(subscriber, "subscriber");
Uni<? extends T> uni;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromPublisher<O> extends UniOperator<Void, O> {
private final Publisher<? extends O> publisher;
Expand All @@ -21,7 +22,7 @@ public UniCreateFromPublisher(Publisher<? extends O> publisher) {

@SuppressWarnings("SubscriberImplementation")
@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AtomicReference<Subscription> reference = new AtomicReference<>();
Subscriber<O> actual = new Subscriber<O>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Consumer;

import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateWithEmitter<T> extends AbstractUni<T> {
private final Consumer<UniEmitter<? super T>> consumer;
Expand All @@ -14,7 +15,7 @@ public UniCreateWithEmitter(Consumer<UniEmitter<? super T>> consumer) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
DefaultUniEmitter<? super T> emitter = new DefaultUniEmitter<>(subscriber);
subscriber.onSubscribe(emitter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayOnItem<T> extends UniOperator<T, T> {
Expand All @@ -24,7 +25,7 @@ public UniDelayOnItem(Uni<T> upstream, Duration duration, ScheduledExecutorServi
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<ScheduledFuture<?>> holder = new AtomicReference<>();
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Function;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayUntil<T> extends UniOperator<T, T> {
Expand All @@ -22,7 +23,7 @@ public UniDelayUntil(Uni<T> upstream, Function<? super T, Uni<?>> function,
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

public class UniDelegatingSubscriber<I, O> implements UniSubscriber<I> {

private final UniSerializedSubscriber<? super O> delegate;
private final UniSubscriber<? super O> delegate;

public UniDelegatingSubscriber(UniSerializedSubscriber<? super O> subscriber) {
public UniDelegatingSubscriber(UniSubscriber<? super O> subscriber) {
this.delegate = nonNull(subscriber, "delegate");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.Executor;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniEmitOn<I> extends UniOperator<I, I> {
private final Executor executor;
Expand All @@ -15,7 +16,7 @@ public class UniEmitOn<I> extends UniOperator<I, I> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<I, I>(subscriber) {
@Override
public void onItem(I item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public UniFailOnTimeout(Uni<I> upstream, Duration timeout, Supplier<? extends Th
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AtomicBoolean doneOrCancelled = new AtomicBoolean();
AtomicReference<ScheduledFuture<?>> task = new AtomicReference<>();

Expand Down Expand Up @@ -92,7 +92,7 @@ public void onFailure(Throwable failure) {
});
}

private void sendTimeout(UniSerializedSubscriber<? super I> subscriber, UniSubscription subscription) {
private void sendTimeout(UniSubscriber<? super I> subscriber, UniSubscription subscription) {

// Cancel the upstream subscription
subscription.cancel();
Expand Down
Loading

0 comments on commit 832e918

Please sign in to comment.