Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Processor-with-upstream case to be backward compatible #3065

Merged
merged 1 commit into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ task japicmp(type: JapicmpTask) {
'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)',
'reactor.core.publisher.FluxSink#contextView()',
'reactor.core.publisher.MonoSink#contextView()',
'reactor.core.publisher.SynchronousSink#contextView()',
'reactor.core.publisher.Sinks#unsafe()'
'reactor.core.publisher.SynchronousSink#contextView()'
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
Expand Down Expand Up @@ -59,8 +58,7 @@
* @deprecated To be removed in 3.5. Prefer clear cut usage of {@link Sinks} through
* variations of {@link Sinks.MulticastSpec#onBackpressureBuffer() Sinks.many().multicast().onBackpressureBuffer()}.
* If you really need the subscribe-to-upstream functionality of a {@link org.reactivestreams.Processor}, switch
* to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of
* {@link Sinks.MulticastUnsafeSpec#onBackpressureBuffer() Sinks.unsafe().many().multicast().onBackpressureBuffer()}.
* to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of {@link Sinks.RootSpec#manyWithUpstream() Sinks.unsafe().manyWithUpstream()}.
* <p/>This processor was blocking in {@link EmitterProcessor#onNext(Object)}. This behaviour can be implemented with the {@link Sinks} API by calling
* {@link Sinks.Many#tryEmitNext(Object)} and retrying, e.g.:
* <pre>{@code while (sink.tryEmitNext(v).hasFailed()) {
Expand Down
117 changes: 60 additions & 57 deletions reactor-core/src/main/java/reactor/core/publisher/Sinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
Expand Down Expand Up @@ -95,16 +94,16 @@ public static ManySpec many() {
}

/**
* Return a {@link UnsafeSpec root spec} for more advanced use cases such as building operators.
* Return a {@link RootSpec root spec} for more advanced use cases such as building operators.
* Unsafe {@link Sinks.Many}, {@link Sinks.One} and {@link Sinks.Empty} are not serialized nor thread safe,
* which implies they MUST be externally synchronized so as to respect the Reactive Streams specification.
* This can typically be the case when the sinks are being called from within a Reactive Streams-compliant context,
* like a {@link Subscriber} or an operator. In turn, this allows the sinks to have less overhead, since they
* don't care to detect concurrent access anymore.
*
* @return {@link UnsafeSpec}
* @return {@link RootSpec}
*/
public static UnsafeSpec unsafe() {
public static RootSpec unsafe() {
return SinksSpecs.UNSAFE_ROOT_SPEC;
}

Expand Down Expand Up @@ -291,6 +290,7 @@ static EmitFailureHandler busyLooping(Duration duration){
boolean onEmitFailure(SignalType signalType, EmitResult emitResult);
}

//implementation note: this should now only be implemented by the Sinks.unsafe() path
/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}.
Expand Down Expand Up @@ -328,41 +328,16 @@ public interface RootSpec {
* @return {@link ManySpec}
*/
ManySpec many();
}

/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}, but without the
* guards against concurrent access. These raw sinks need more advanced understanding
* of the Reactive Streams specification in order to be correctly used.
* <p>
* Some flavors of {@link Sinks.Many} are {@link ManyWithUpstream} which additionally
* support being subscribed to an upstream {@link Publisher}, at most once.
* Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal
* creation to be the sole responsibility of the upstream {@link Publisher}.
* The list of such flavors is as follows:
* <ul>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer() onBackpressureBuffer()}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int) onBackpressureBuffer(int)}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int, boolean) onBackpressureBuffer(int, boolean)}
* </li>
* </ul>
*/
public interface UnsafeSpec extends RootSpec {

/**
* {@inheritDoc}
* <p>
* Some flavors return a {@link ManyWithUpstream}, supporting being subscribed to an upstream {@link Publisher}.
* Help building {@link Sinks.ManyWithUpstream} sinks that can also be {@link ManyWithUpstream#subscribeTo(Publisher) subscribed to}
* an upstream {@link Publisher}. This is an advanced use case, see {@link ManyWithUpstream#subscribeTo(Publisher)}.
*
* @return a {@link ManyWithUpstreamUnsafeSpec}
*/
@Override
ManyUnsafeSpec many();
ManyWithUpstreamUnsafeSpec manyWithUpstream();
}

/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements
*/
Expand Down Expand Up @@ -391,31 +366,59 @@ public interface ManySpec {
}

/**
* Provides multicast : 1 sink, N {@link Subscriber}.
* Instead of {@link Sinks#unsafe() unsafe} flavors of {@link Sinks.Many}, this spec provides {@link ManyWithUpstream}
* implementations. These additionally support being subscribed to an upstream {@link Publisher}, at most once.
* Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal creation to be the
* sole responsibility of the upstream {@link Publisher}.
* <p>
* This {@link MulticastSpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
* As the number of such implementations is deliberately kept low, this spec doesn't further distinguish between
* multicast/unicast/replay categories other than in method naming.
*/
public interface MulticastUnsafeSpec extends MulticastSpec {

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer();

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize);

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize, boolean autoCancel);
}

/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements.
* <p>
* This {@link ManySpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
*/
public interface ManyUnsafeSpec extends ManySpec {
public interface ManyWithUpstreamUnsafeSpec {
/**
* A {@link Sinks.ManyWithUpstream} with the following characteristics:
* <ul>
* <li>Multicast</li>
* <li>Without {@link Subscriber}: warm up. Remembers up to {@link Queues#SMALL_BUFFER_SIZE}
* elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.</li>
* <li>Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
* of multiple subscribers.<br>If the difference between multiple subscribers is greater than {@link Queues#SMALL_BUFFER_SIZE}:
* <ul><li>{@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}</li>
* <li>{@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
* an {@link Exceptions#failWithOverflow() overflow error}.</li></ul>
* </li>
* <li>Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
* the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
* that have been buffered due to backpressure/warm up.</li>
* </ul>
* <p>
* <img class="marble" src="doc-files/marbles/sinkWarmup.svg" alt="">
*/
<T> ManyWithUpstream<T> multicastOnBackpressureBuffer();

@Override
MulticastUnsafeSpec multicast();
/**
* A {@link Sinks.ManyWithUpstream} with the following characteristics:
* <ul>
* <li>Multicast</li>
* <li>Without {@link Subscriber}: warm up. Remembers up to {@code bufferSize}
* elements pushed via {@link Many#tryEmitNext(Object)} before the first {@link Subscriber} is registered.</li>
* <li>Backpressure : this sink honors downstream demand by conforming to the lowest demand in case
* of multiple subscribers.<br>If the difference between multiple subscribers is too high compared to {@code bufferSize}:
* <ul><li>{@link Many#tryEmitNext(Object) tryEmitNext} will return {@link EmitResult#FAIL_OVERFLOW}</li>
* <li>{@link Many#emitNext(Object, Sinks.EmitFailureHandler) emitNext} will terminate the sink by {@link Many#emitError(Throwable, Sinks.EmitFailureHandler) emitting}
* an {@link Exceptions#failWithOverflow() overflow error}.</li></ul>
* </li>
* <li>Replaying: No replay of values seen by earlier subscribers. Only forwards to a {@link Subscriber}
* the elements that have been pushed to the sink AFTER this subscriber was subscribed, or elements
* that have been buffered due to backpressure/warm up.</li>
* </ul>
* <p>
* <img class="marble" src="doc-files/marbles/sinkWarmup.svg" alt="">
*
* @param bufferSize the maximum queue size
* @param autoCancel should the sink fully shutdowns (not publishing anymore) when the last subscriber cancels
*/
<T> ManyWithUpstream<T> multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel);
}

/**
Expand Down
37 changes: 26 additions & 11 deletions reactor-core/src/main/java/reactor/core/publisher/SinksSpecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

final class SinksSpecs {

static final Sinks.UnsafeSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
static final Sinks.RootSpec UNSAFE_ROOT_SPEC = new UnsafeSpecImpl();
static final DefaultSinksSpecs DEFAULT_SINKS = new DefaultSinksSpecs();

abstract static class AbstractSerializedSink {
Expand Down Expand Up @@ -64,7 +64,7 @@ boolean tryAcquire(Thread currentThread) {


static final class UnsafeSpecImpl
implements Sinks.UnsafeSpec, Sinks.ManyUnsafeSpec, Sinks.MulticastUnsafeSpec, Sinks.MulticastReplaySpec {
implements Sinks.RootSpec, Sinks.ManySpec, Sinks.ManyWithUpstreamUnsafeSpec, Sinks.MulticastSpec, Sinks.MulticastReplaySpec {

final Sinks.UnicastSpec unicastSpec;

Expand All @@ -83,7 +83,7 @@ public <T> One<T> one() {
}

@Override
public Sinks.ManyUnsafeSpec many() {
public Sinks.ManySpec many() {
return this;
}

Expand All @@ -92,29 +92,34 @@ public Sinks.UnicastSpec unicast() {
return this.unicastSpec;
}

@Override
public Sinks.MulticastSpec multicast() {
return this;
}

@Override
public Sinks.MulticastReplaySpec replay() {
return this;
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
public Sinks.ManyWithUpstreamUnsafeSpec manyWithUpstream() {
return this;
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer(int bufferSize) {
return new EmitterProcessor<>(true, bufferSize);
public <T> Sinks.Many<T> onBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
}

@Override
public <T> Sinks.ManyWithUpstream<T> onBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
public <T> Sinks.Many<T> onBackpressureBuffer(int bufferSize) {
return new EmitterProcessor<>(true, bufferSize);
}

@Override
public Sinks.MulticastUnsafeSpec multicast() {
return this;
public <T> Sinks.Many<T> onBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
}

@Override
Expand Down Expand Up @@ -171,6 +176,16 @@ public <T> Many<T> limit(int historySize, Duration maxAge) {
public <T> Many<T> limit(int historySize, Duration maxAge, Scheduler scheduler) {
return ReplayProcessor.createSizeAndTimeout(historySize, maxAge, scheduler);
}

@Override
public <T> Sinks.ManyWithUpstream<T> multicastOnBackpressureBuffer() {
return new EmitterProcessor<>(true, Queues.SMALL_BUFFER_SIZE);
}

@Override
public <T> Sinks.ManyWithUpstream<T> multicastOnBackpressureBuffer(int bufferSize, boolean autoCancel) {
return new EmitterProcessor<>(autoCancel, bufferSize);
}
}

//Note: RootSpec is now reserved for Sinks.unsafe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand All @@ -48,7 +46,6 @@
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.subscriber.TestSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
Expand All @@ -70,8 +67,8 @@ public class EmitterProcessorTest {
AutoDisposingExtension afterTest = new AutoDisposingExtension();

@Test
void smokeTestManySubscriber() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
void smokeTestManyWithUpstream() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber<Integer> testSubscriber1 = TestSubscriber.create();
final TestSubscriber<Integer> testSubscriber2 = TestSubscriber.create();
final Flux<Integer> upstream = Flux.range(1, 10);
Expand Down Expand Up @@ -102,7 +99,7 @@ void smokeTestManySubscriber() {

@Test
void smokeTestSubscribeAndDispose() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer();
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer();
final TestSubscriber<Integer> testSubscriber1 = TestSubscriber.create();
final TestSubscriber<Integer> testSubscriber2 = TestSubscriber.create();
final Flux<Integer> upstream = Flux.never();
Expand Down Expand Up @@ -130,7 +127,7 @@ void smokeTestSubscribeAndDispose() {

@Test
void subscribeToUpstreamTwiceSkipsSecondSubscription() {
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().many().multicast().onBackpressureBuffer(123);
final Sinks.ManyWithUpstream<Integer> adapter = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(123, true);
final TestPublisher<Integer> upstream1 = TestPublisher.create();
final TestPublisher<Integer> upstream2 = TestPublisher.create();

Expand Down