Skip to content

Commit

Permalink
Rework Processor-with-upstream case to be backward compatible (#3065)
Browse files Browse the repository at this point in the history
This commit reworks #3042 and reverts the changes to RootSpec returning
methods. These changes were binary incompatible with the previous
release of reactor, despite an API-compatible change.

What is reverted:
 - the `UnsafeRootSpec` and intermediate specs are removed
 - `Sinks.unsafe()` is back to returning `RootSpec`

What is kept:
 - `RootSpec` is dedicated to `Sinks.unsafe()`
 - `Sinks.manyWithUpstream` interface is still introduced

What is modified:
 - `RootSpec` receives an additional `manyWithUpstream()` method which
 is the one exposing `Sinks.manyWithUpstream` implementations
  • Loading branch information
simonbasle authored Jun 7, 2022
1 parent 6552d60 commit f215a64
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 80 deletions.
3 changes: 1 addition & 2 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ task japicmp(type: JapicmpTask) {
'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)',
'reactor.core.publisher.FluxSink#contextView()',
'reactor.core.publisher.MonoSink#contextView()',
'reactor.core.publisher.SynchronousSink#contextView()',
'reactor.core.publisher.Sinks#unsafe()'
'reactor.core.publisher.SynchronousSink#contextView()'
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
Expand Down Expand Up @@ -59,8 +58,7 @@
* @deprecated To be removed in 3.5. Prefer clear cut usage of {@link Sinks} through
* variations of {@link Sinks.MulticastSpec#onBackpressureBuffer() Sinks.many().multicast().onBackpressureBuffer()}.
* If you really need the subscribe-to-upstream functionality of a {@link org.reactivestreams.Processor}, switch
* to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of
* {@link Sinks.MulticastUnsafeSpec#onBackpressureBuffer() Sinks.unsafe().many().multicast().onBackpressureBuffer()}.
* to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of {@link Sinks.RootSpec#manyWithUpstream() Sinks.unsafe().manyWithUpstream()}.
* <p/>This processor was blocking in {@link EmitterProcessor#onNext(Object)}. This behaviour can be implemented with the {@link Sinks} API by calling
* {@link Sinks.Many#tryEmitNext(Object)} and retrying, e.g.:
* <pre>{@code while (sink.tryEmitNext(v).hasFailed()) {
Expand Down
117 changes: 60 additions & 57 deletions reactor-core/src/main/java/reactor/core/publisher/Sinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
Expand Down Expand Up @@ -95,16 +94,16 @@ public static ManySpec many() {
}

/**
* Return a {@link UnsafeSpec root spec} for more advanced use cases such as building operators.
* Return a {@link RootSpec root spec} for more advanced use cases such as building operators.
* Unsafe {@link Sinks.Many}, {@link Sinks.One} and {@link Sinks.Empty} are not serialized nor thread safe,
* which implies they MUST be externally synchronized so as to respect the Reactive Streams specification.
* This can typically be the case when the sinks are being called from within a Reactive Streams-compliant context,
* like a {@link Subscriber} or an operator. In turn, this allows the sinks to have less overhead, since they
* don't care to detect concurrent access anymore.
*
* @return {@link UnsafeSpec}
* @return {@link RootSpec}
*/
public static UnsafeSpec unsafe() {
public static RootSpec unsafe() {
return SinksSpecs.UNSAFE_ROOT_SPEC;
}

Expand Down Expand Up @@ -291,6 +290,7 @@ static EmitFailureHandler busyLooping(Duration duration){
boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
}

//implementation note: this should now only be implemented by the Sinks.unsafe() path
/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}.
Expand Down Expand Up @@ -328,41 +328,16 @@ public interface RootSpec {
* @return {@link ManySpec}
*/
ManySpec many();
}

/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}, but without the
* guards against concurrent access. These raw sinks need more advanced understanding
* of the Reactive Streams specification in order to be correctly used.
* <p>
* Some flavors of {@link Sinks.Many} are {@link ManyWithUpstream} which additionally
* support being subscribed to an upstream {@link Publisher}, at most once.
* Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal
* creation to be the sole responsibility of the upstream {@link Publisher}.
* The list of such flavors is as follows:
* <ul>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer() onBackpressureBuffer()}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int) onBackpressureBuffer(int)}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int, boolean) onBackpressureBuffer(int, boolean)}
* </li>
* </ul>
*/
public interface UnsafeSpec extends RootSpec {

/**
* {@inheritDoc}
* <p>
* Some flavors return a {@link ManyWithUpstream}, supporting being subscribed to an upstream {@link Publisher}.
* Help building {@link Sinks.ManyWithUpstream} sinks that can also be {@link ManyWithUpstream#subscribeTo(Publisher) subscribed to}
* an upstream {@link Publisher}. This is an advanced use case, see {@link ManyWithUpstream#subscribeTo(Publisher)}.
*
* @return a {@link ManyWithUpstreamUnsafeSpec}
*/
@Override
ManyUnsafeSpec many();
ManyWithUpstreamUnsafeSpec manyWithUpstream();
}

/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements
*/
Expand Down Expand Up @@ -391,31 +366,59 @@ public interface ManySpec {
}

/**
* Provides multicast : 1 sink, N {@link Subscriber}.
* Instead of {@link Sinks#unsafe() unsafe} flavors of {@link Sinks.Many}, this spec provides {@link ManyWithUpstream}
* implementations. These additionally support being subscribed to an upstream {@link Publisher}, at most once.
* Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal creation to be the
* sole responsibility of the upstream {@link Publisher}.
* <p>
* This {@link MulticastSpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
* As the number of such implementations is deliberately kept low, this spec doesn't further distinguish between
* multicast/unicast/replay categories other than in method naming.
*/
public interface MulticastUnsafeSpec extends MulticastSpec {

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer();

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize);

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize, boolean autoCancel);
}

/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements.
* <p>
* This {@link ManySpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
*/
public interface ManyUnsafeSpec extends ManySpec {
public interface ManyWithUpstreamUnsafeSpec {
/**
* A {@link Sinks.ManyWithUpstream} with the following characteristics:
* <ul>
* <li>Multicast</li>
* <li>Without {@link Subscriber}: warm up. Remembers up to {@link Queues#SMALL_BUFFER_SIZE}
* elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.</li>
* <li>Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
* of multiple subscribers.<br>If the difference between multiple subscribers is greater than {@link Queues#SMALL_BUFFER_SIZE}:
* <ul><li>{@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}</li>
* <li>{@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
* an {@link Exceptions#failWithOverflow() overflow error}.</li></ul>
* </li>
* <li>Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
* the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
* that have been buffered due to backpressure/warm up.</li>
* </ul>
* <p>
* <img class="marble" src="doc-files/marbles/sinkWarmup.svg" alt="">
*/
<T> ManyWithUpstream<T> multicastOnBackpressureBuffer();

@Override
MulticastUnsafeSpec multicast();
/**
* A {@link Sinks.ManyWithUpstream} with the following characteristics:
* <ul>
* <li>Multicast</li>
* <li>Without {@link Subscriber}: warm up. Remembers up to {@code bufferSize}
* elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.</li>
* <li>Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
* of multiple subscribers.<br>If the difference between multiple subscribers is too high compared to {@code bufferSize}:
* <ul><li>{@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}</li>
* <li>{@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
* an {@link Exceptions#failWithOverflow() overflow error}.</li></ul>
* </li>
* <li>Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
* the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
* that have been buffered due to backpressure/warm up.</li>
* </ul>
* <p>
* <img class="marble" src="doc-files/marbles/sinkWarmup.svg" alt="">
*
* @param bufferSize the maximum queue size
* @param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels
*/
<T> ManyWithUpstream<T> multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel);
}

/**
Expand Down
37 changes: 26 additions & 11 deletions reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

final class SinksSpecs {

static final Sinks.UnsafeSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
static final Sinks.RootSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
static final DefaultSinksSpecs DEFAULT_SINKS = new DefaultSinksSpecs();

abstract static class AbstractSerializedSink {
Expand Down Expand Up @@ -64,7 +64,7 @@ boolean tryAcquire(Thread currentThread) {


static final class UnsafeSpecImpl
implements Sinks.UnsafeSpec, Sinks.ManyUnsafeSpec, Sinks.MulticastUnsafeSpec, Sinks.MulticastReplaySpec {
implements Sinks.RootSpec, Sinks.ManySpec, Sinks.ManyWithUpstreamUnsafeSpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {

final Sinks.UnicastSpec unicastSpec;

Expand All @@ -83,7 +83,7 @@ public <T> One<T> one() {
}

@Override
public Sinks.ManyUnsafeSpec many() {
public Sinks.ManySpec many() {
return this;
}

Expand All @@ -92,29 +92,34 @@ public Sinks.UnicastSpec unicast() {
return this.unicastSpec;
}

@Override
public Sinks.MulticastSpec multicast() {
return this;
}

@Override
public Sinks.MulticastReplaySpec replay() {
return this;
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
public Sinks.ManyWithUpstreamUnsafeSpec manyWithUpstream() {
return this;
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer(int bufferSize) {
return new EmitterProcessor<>(true, bufferSize);
public <T> Sinks.Many<T> onBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
public <T> Sinks.Many<T> onBackpressureBuffer(int bufferSize) {
return new EmitterProcessor<>(true, bufferSize);
}

@Override
public Sinks.MulticastUnsafeSpec multicast() {
return this;
public <T> Sinks.Many<T> onBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
}

@Override
Expand Down Expand Up @@ -171,6 +176,16 @@ public <T> Many<T> limit(int historySize, Duration maxAge) {
public <T> Many<T> limit(int historySize, Duration maxAge, Scheduler scheduler) {
return ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
}

@Override
public <T> Sinks.ManyWithUpstream<T> multicastOnBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
}

@Override
public <T> Sinks.ManyWithUpstream<T> multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
}
}

//Note: RootSpec is now reserved for Sinks.unsafe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand All @@ -48,7 +46,6 @@
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
Expand All @@ -70,8 +67,8 @@ public class EmitterProcessorTest {
AutoDisposingExtension afterTest = new AutoDisposingExtension();

@Test
void smokeTestManySubscriber() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
void smokeTestManyWithUpstream() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber<Integer> testSubscriber1 = TestSubscriber.create();
final TestSubscriber<Integer> testSubscriber2 = TestSubscriber.create();
final Flux<Integer> upstream = Flux.range(1, 10);
Expand Down Expand Up @@ -102,7 +99,7 @@ void smokeTestManySubscriber() {

@Test
void smokeTestSubscribeAndDispose() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber<Integer> testSubscriber1 = TestSubscriber.create();
final TestSubscriber<Integer> testSubscriber2 = TestSubscriber.create();
final Flux<Integer> upstream = Flux.never();
Expand Down Expand Up @@ -130,7 +127,7 @@ void smokeTestSubscribeAndDispose() {

@Test
void subscribeToUpstreamTwiceSkipsSecondSubscription() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(123);
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(123, true);
final TestPublisher<Integer> upstream1 = TestPublisher.create();
final TestPublisher<Integer> upstream2 = TestPublisher.create();

Expand Down

0 comments on commit f215a64

Please sign in to comment.