Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to UniSubscriber #379

Merged
merged 4 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.UniInterceptor;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

Expand Down Expand Up @@ -44,7 +43,7 @@ public <T> Uni<T> onUniCreation(Uni<T> uni) {
Executor executor = getThreadContext().currentContextExecutor();
return new AbstractUni<T>() {
@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
executor.execute(() -> AbstractUni.subscribe(uni, subscriber));
}
};
Expand Down
49 changes: 49 additions & 0 deletions implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"io\\.smallrye\\.mutiny\\..*"
],
"exclude": [
"io\\.smallrye\\.mutiny\\.operators\\.*",
"io\\.smallrye\\.mutiny\\.operators\\..*"
]
}
Expand All @@ -32,6 +33,54 @@
"old": "method <T> io.smallrye.mutiny.helpers.Subscriptions.SingleItemSubscription<T> io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber<T>, T)",
"new": "method <T> org.reactivestreams.Subscription io.smallrye.mutiny.helpers.Subscriptions::single(org.reactivestreams.Subscriber<T>, T)",
"justification": "SingleItemSubscription should not be exposed, and should be used as a regular Subscription"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniGlobalSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnCancellationSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnFailureSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemOrFailureSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnItemSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnSubscribeSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.helpers.spies.UniOnTerminationSpy<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void io.smallrye.mutiny.operators.AbstractUni<T>::subscribing(===io.smallrye.mutiny.operators.UniSerializedSubscriber<? super T>===)",
"new": "parameter void io.smallrye.mutiny.operators.AbstractUni<T>::subscribing(===io.smallrye.mutiny.subscription.UniSubscriber<? super T>===)",
"justification": "subscribing(subscriber) is a protected method that is not used by regular consumers of the Mutiny public API"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniGlobalSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -68,7 +68,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
onTerminationSpy.subscribe().withSubscriber(downstream);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnCancellationSpy<T> extends UniSpyBase<T> {

Expand All @@ -10,7 +10,7 @@ public class UniOnCancellationSpy<T> extends UniSpyBase<T> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onCancellation().invoke(this::incrementInvocationCount)
.subscribe().withSubscriber(downstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -38,7 +38,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
UniOnFailure<? extends T> group;
if (predicate != null) {
group = upstream().onFailure(predicate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemOrFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -32,7 +32,7 @@ public Throwable lastFailure() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItemOrFailure().invoke((item, failure) -> {
synchronized (UniOnItemOrFailureSpy.this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemSpy<T> extends UniSpyBase<T> {

Expand All @@ -22,7 +22,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItem().invoke(item -> {
this.lastItem = item;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniOnSubscribeSpy<T> extends UniSpyBase<T> {
Expand All @@ -23,7 +23,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onSubscribe().invoke(uniSubscription -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.tuples.Tuple3;

public class UniOnTerminationSpy<T> extends UniSpyBase<T> {
Expand Down Expand Up @@ -31,7 +31,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onTermination().invoke((i, f, c) -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public abstract class AbstractUni<T> implements Uni<T> {

protected abstract void subscribing(UniSerializedSubscriber<? super T> subscriber);
protected abstract void subscribing(UniSubscriber<? super T> subscriber);

/**
* Encapsulates subscription to slightly optimized the AbstractUni case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public UniAndCombination(Uni<? extends I> upstream, List<? extends Uni<?>> other
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AndSupervisor andSupervisor = new AndSupervisor(subscriber);
subscriber.onSubscribe(andSupervisor);
// Must wait until the subscriber get a subscription before subscribing to the sources.
Expand All @@ -51,11 +51,11 @@ protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
private class AndSupervisor implements UniSubscription {

private final List<UniHandler> handlers = new ArrayList<>();
private final UniSerializedSubscriber<? super O> subscriber;
private final UniSubscriber<? super O> subscriber;

AtomicBoolean cancelled = new AtomicBoolean();

AndSupervisor(UniSerializedSubscriber<? super O> sub) {
AndSupervisor(UniSubscriber<? super O> sub) {
subscriber = sub;

for (Uni u : unis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Supplier;

import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromCompletionStage<O> extends UniOperator<Void, O> {
private final Supplier<? extends CompletionStage<? extends O>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromCompletionStage(Supplier<? extends CompletionStage<? extends
}

private static <O> void forwardFromCompletionStage(CompletionStage<? extends O> stage,
UniSerializedSubscriber<? super O> subscriber) {
UniSubscriber<? super O> subscriber) {
subscriber.onSubscribe(() -> stage.toCompletableFuture().cancel(false));
stage.whenComplete((res, fail) -> {
if (fail != null) {
Expand All @@ -33,7 +34,7 @@ private static <O> void forwardFromCompletionStage(CompletionStage<? extends O>
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
CompletionStage<? extends O> stage;
try {
stage = supplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromDeferredSupplier<T> extends UniOperator<Void, T> {
private final Supplier<Uni<? extends T>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromDeferredSupplier(Supplier<Uni<? extends T>> supplier) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
nonNull(subscriber, "subscriber");
Uni<? extends T> uni;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromPublisher<O> extends UniOperator<Void, O> {
private final Publisher<? extends O> publisher;
Expand All @@ -21,7 +22,7 @@ public UniCreateFromPublisher(Publisher<? extends O> publisher) {

@SuppressWarnings("SubscriberImplementation")
@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AtomicReference<Subscription> reference = new AtomicReference<>();
Subscriber<O> actual = new Subscriber<O>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Consumer;

import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateWithEmitter<T> extends AbstractUni<T> {
private final Consumer<UniEmitter<? super T>> consumer;
Expand All @@ -14,7 +15,7 @@ public UniCreateWithEmitter(Consumer<UniEmitter<? super T>> consumer) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
DefaultUniEmitter<? super T> emitter = new DefaultUniEmitter<>(subscriber);
subscriber.onSubscribe(emitter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayOnItem<T> extends UniOperator<T, T> {
Expand All @@ -24,7 +25,7 @@ public UniDelayOnItem(Uni<T> upstream, Duration duration, ScheduledExecutorServi
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<ScheduledFuture<?>> holder = new AtomicReference<>();
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Function;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayUntil<T> extends UniOperator<T, T> {
Expand All @@ -22,7 +23,7 @@ public UniDelayUntil(Uni<T> upstream, Function<? super T, Uni<?>> function,
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

public class UniDelegatingSubscriber<I, O> implements UniSubscriber<I> {

private final UniSerializedSubscriber<? super O> delegate;
private final UniSubscriber<? super O> delegate;

public UniDelegatingSubscriber(UniSerializedSubscriber<? super O> subscriber) {
public UniDelegatingSubscriber(UniSubscriber<? super O> subscriber) {
this.delegate = nonNull(subscriber, "delegate");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.Executor;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniEmitOn<I> extends UniOperator<I, I> {
private final Executor executor;
Expand All @@ -15,7 +16,7 @@ public class UniEmitOn<I> extends UniOperator<I, I> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<I, I>(subscriber) {
@Override
public void onItem(I item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public UniFailOnTimeout(Uni<I> upstream, Duration timeout, Supplier<? extends Th
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AtomicBoolean doneOrCancelled = new AtomicBoolean();
AtomicReference<ScheduledFuture<?>> task = new AtomicReference<>();

Expand Down Expand Up @@ -92,7 +92,7 @@ public void onFailure(Throwable failure) {
});
}

private void sendTimeout(UniSerializedSubscriber<? super I> subscriber, UniSubscription subscription) {
private void sendTimeout(UniSubscriber<? super I> subscriber, UniSubscription subscription) {

// Cancel the upstream subscription
subscription.cancel();
Expand Down
Loading