Skip to content

Commit

Permalink
Refactor the Uni codebase to generalise on UniSubscriber instead of U…
Browse files Browse the repository at this point in the history
…niSerializedSubscriber
  • Loading branch information
jponge committed Dec 7, 2020
1 parent 3ef50de commit ac80ee9
Show file tree
Hide file tree
Showing 43 changed files with 123 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniGlobalSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -68,7 +68,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
onTerminationSpy.subscribe().withSubscriber(downstream);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnCancellationSpy<T> extends UniSpyBase<T> {

Expand All @@ -10,7 +10,7 @@ public class UniOnCancellationSpy<T> extends UniSpyBase<T> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onCancellation().invoke(this::incrementInvocationCount)
.subscribe().withSubscriber(downstream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -38,7 +38,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
UniOnFailure<? extends T> group;
if (predicate != null) {
group = upstream().onFailure(predicate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemOrFailureSpy<T> extends UniSpyBase<T> {

Expand Down Expand Up @@ -32,7 +32,7 @@ public Throwable lastFailure() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItemOrFailure().invoke((item, failure) -> {
synchronized (UniOnItemOrFailureSpy.this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniOnItemSpy<T> extends UniSpyBase<T> {

Expand All @@ -22,7 +22,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onItem().invoke(item -> {
this.lastItem = item;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniOnSubscribeSpy<T> extends UniSpyBase<T> {
Expand All @@ -23,7 +23,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onSubscribe().invoke(uniSubscription -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.smallrye.mutiny.helpers.spies;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.UniSerializedSubscriber;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.tuples.Tuple3;

public class UniOnTerminationSpy<T> extends UniSpyBase<T> {
Expand Down Expand Up @@ -31,7 +31,7 @@ public void reset() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> downstream) {
protected void subscribing(UniSubscriber<? super T> downstream) {
upstream()
.onTermination().invoke((i, f, c) -> {
incrementInvocationCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public abstract class AbstractUni<T> implements Uni<T> {

protected abstract void subscribing(UniSerializedSubscriber<? super T> subscriber);
protected abstract void subscribing(UniSubscriber<? super T> subscriber);

/**
* Encapsulates subscription to slightly optimized the AbstractUni case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public UniAndCombination(Uni<? extends I> upstream, List<? extends Uni<?>> other
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AndSupervisor andSupervisor = new AndSupervisor(subscriber);
subscriber.onSubscribe(andSupervisor);
// Must wait until the subscriber get a subscription before subscribing to the sources.
Expand All @@ -51,11 +51,11 @@ protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
private class AndSupervisor implements UniSubscription {

private final List<UniHandler> handlers = new ArrayList<>();
private final UniSerializedSubscriber<? super O> subscriber;
private final UniSubscriber<? super O> subscriber;

AtomicBoolean cancelled = new AtomicBoolean();

AndSupervisor(UniSerializedSubscriber<? super O> sub) {
AndSupervisor(UniSubscriber<? super O> sub) {
subscriber = sub;

for (Uni u : unis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.function.Supplier;

import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromCompletionStage<O> extends UniOperator<Void, O> {
private final Supplier<? extends CompletionStage<? extends O>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromCompletionStage(Supplier<? extends CompletionStage<? extends
}

private static <O> void forwardFromCompletionStage(CompletionStage<? extends O> stage,
UniSerializedSubscriber<? super O> subscriber) {
UniSubscriber<? super O> subscriber) {
subscriber.onSubscribe(() -> stage.toCompletableFuture().cancel(false));
stage.whenComplete((res, fail) -> {
if (fail != null) {
Expand All @@ -33,7 +34,7 @@ private static <O> void forwardFromCompletionStage(CompletionStage<? extends O>
}

@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
CompletionStage<? extends O> stage;
try {
stage = supplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromDeferredSupplier<T> extends UniOperator<Void, T> {
private final Supplier<Uni<? extends T>> supplier;
Expand All @@ -17,7 +18,7 @@ public UniCreateFromDeferredSupplier(Supplier<Uni<? extends T>> supplier) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
nonNull(subscriber, "subscriber");
Uni<? extends T> uni;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateFromPublisher<O> extends UniOperator<Void, O> {
private final Publisher<? extends O> publisher;
Expand All @@ -21,7 +22,7 @@ public UniCreateFromPublisher(Publisher<? extends O> publisher) {

@SuppressWarnings("SubscriberImplementation")
@Override
protected void subscribing(UniSerializedSubscriber<? super O> subscriber) {
protected void subscribing(UniSubscriber<? super O> subscriber) {
AtomicReference<Subscription> reference = new AtomicReference<>();
Subscriber<O> actual = new Subscriber<O>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.function.Consumer;

import io.smallrye.mutiny.subscription.UniEmitter;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniCreateWithEmitter<T> extends AbstractUni<T> {
private final Consumer<UniEmitter<? super T>> consumer;
Expand All @@ -14,7 +15,7 @@ public UniCreateWithEmitter(Consumer<UniEmitter<? super T>> consumer) {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
DefaultUniEmitter<? super T> emitter = new DefaultUniEmitter<>(subscriber);
subscriber.onSubscribe(emitter);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicReference;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayOnItem<T> extends UniOperator<T, T> {
Expand All @@ -24,7 +25,7 @@ public UniDelayOnItem(Uni<T> upstream, Duration duration, ScheduledExecutorServi
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<ScheduledFuture<?>> holder = new AtomicReference<>();
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Function;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;

public class UniDelayUntil<T> extends UniOperator<T, T> {
Expand All @@ -22,7 +23,7 @@ public UniDelayUntil(Uni<T> upstream, Function<? super T, Uni<?>> function,
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
AtomicReference<UniSubscription> reference = new AtomicReference<>();
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<T, T>(subscriber) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

public class UniDelegatingSubscriber<I, O> implements UniSubscriber<I> {

private final UniSerializedSubscriber<? super O> delegate;
private final UniSubscriber<? super O> delegate;

public UniDelegatingSubscriber(UniSerializedSubscriber<? super O> subscriber) {
public UniDelegatingSubscriber(UniSubscriber<? super O> subscriber) {
this.delegate = nonNull(subscriber, "delegate");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.concurrent.Executor;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniEmitOn<I> extends UniOperator<I, I> {
private final Executor executor;
Expand All @@ -15,7 +16,7 @@ public class UniEmitOn<I> extends UniOperator<I, I> {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AbstractUni.subscribe(upstream(), new UniDelegatingSubscriber<I, I>(subscriber) {
@Override
public void onItem(I item) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public UniFailOnTimeout(Uni<I> upstream, Duration timeout, Supplier<? extends Th
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
AtomicBoolean doneOrCancelled = new AtomicBoolean();
AtomicReference<ScheduledFuture<?>> task = new AtomicReference<>();

Expand Down Expand Up @@ -92,7 +92,7 @@ public void onFailure(Throwable failure) {
});
}

private void sendTimeout(UniSerializedSubscriber<? super I> subscriber, UniSubscription subscription) {
private void sendTimeout(UniSubscriber<? super I> subscriber, UniSubscription subscription) {

// Cancel the upstream subscription
subscription.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ private enum State {
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);

private final AtomicInteger wip = new AtomicInteger();
private final List<UniSerializedSubscriber<? super I>> awaitingSubscription = synchronizedList(new ArrayList<>());
private final List<UniSerializedSubscriber<? super I>> awaitingResult = synchronizedList(new ArrayList<>());
private final List<UniSubscriber<? super I>> awaitingSubscription = synchronizedList(new ArrayList<>());
private final List<UniSubscriber<? super I>> awaitingResult = synchronizedList(new ArrayList<>());

private volatile UniSubscription upstreamSubscription;
private volatile I item;
Expand All @@ -45,7 +45,7 @@ public UniMemoizeOp(Uni<? extends I> upstream, BooleanSupplier invalidationReque
}

@Override
protected void subscribing(UniSerializedSubscriber<? super I> subscriber) {
protected void subscribing(UniSubscriber<? super I> subscriber) {
if (invalidationRequested.getAsBoolean() && state.get() != State.SUBSCRIBING) {
state.set(State.INIT);
if (upstreamSubscription != null) {
Expand Down Expand Up @@ -89,7 +89,7 @@ private void drain() {
int missed = 1;
for (;;) {

ArrayList<UniSerializedSubscriber<? super I>> subscribers;
ArrayList<UniSubscriber<? super I>> subscribers;
I currentItem;
Throwable currentFailure;

Expand All @@ -100,7 +100,7 @@ private void drain() {
}

// Handle the subscribers that are awaiting a subscription
for (UniSerializedSubscriber<? super I> subscriber : subscribers) {
for (UniSubscriber<? super I> subscriber : subscribers) {
currentItem = item;
currentFailure = failure;
State state = this.state.get();
Expand Down Expand Up @@ -135,7 +135,7 @@ private void drain() {
subscribers = new ArrayList<>(awaitingResult);
}
// Handle the subscribers that are awaiting a result
for (UniSerializedSubscriber<? super I> subscriber : subscribers) {
for (UniSubscriber<? super I> subscriber : subscribers) {
currentItem = item;
currentFailure = failure;
if (state.get() == State.CACHING) {
Expand All @@ -156,7 +156,7 @@ private void drain() {
}
}

private void removeFromAwaitingLists(UniSerializedSubscriber<? super I> subscriber) {
private void removeFromAwaitingLists(UniSubscriber<? super I> subscriber) {
awaitingSubscription.remove(subscriber);
awaitingResult.remove(subscriber);
drain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED;

import io.smallrye.mutiny.subscription.UniSubscriber;

public class UniNever<T> extends AbstractUni<T> {
public static final UniNever<Object> INSTANCE = new UniNever<>();

Expand All @@ -10,7 +12,7 @@ private UniNever() {
}

@Override
protected void subscribing(UniSerializedSubscriber<? super T> subscriber) {
protected void subscribing(UniSubscriber<? super T> subscriber) {
subscriber.onSubscribe(CANCELLED);
}
}
Loading

0 comments on commit ac80ee9

Please sign in to comment.