Skip to content

Commit

Permalink
Introduce an alternative to Processor-with-upstream case (#3042)
Browse files Browse the repository at this point in the history
This commit introduces an additional migration path for a subset of
Processors (namely the `EmitterProcessor`) before they get removed from
the public API in 3.5.0.

The `Sinks.ManyWithUpstream` is a Processor-adjacent API that supports
the connection of some Sinks.Many to an upstream like Processors used to
do, but without exposing the `CoreSubscriber` API:
 - use of the `Publisher` aspect via `asFlux()` similar to `Sinks.Many`
 - subscription to an upstream via `subscribeTo(Publisher)`

This way, the underlying `CoreSubscriber` nature is never revealed to
the user which avoids the trouble with mixing incompatible APIs.

As a first scope, only `EmitterProcessor`-backed variants of sinks are
offered, which map to `multicast().onBackpressureBuffer(...)` flavors.
As an additional indication that this is an advanced case, these flavors
are only exposed as `Sinks.ManyWithUpstream` under the `unsafe().many()`
spec, not the root `.many()` spec.
  • Loading branch information
simonbasle authored May 23, 2022
1 parent bee3d07 commit b07f391
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 44 deletions.
3 changes: 2 additions & 1 deletion reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ task japicmp(type: JapicmpTask) {
'reactor.core.publisher.Sinks$EmitFailureHandler#busyLooping(java.time.Duration)',
'reactor.core.publisher.FluxSink#contextView()',
'reactor.core.publisher.MonoSink#contextView()',
'reactor.core.publisher.SynchronousSink#contextView()'
'reactor.core.publisher.SynchronousSink#contextView()',
'reactor.core.publisher.Sinks#unsafe()'
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
Expand Down Expand Up @@ -54,16 +58,19 @@
* @author Stephane Maldini
* @deprecated To be removed in 3.5. Prefer clear cut usage of {@link Sinks} through
* variations of {@link Sinks.MulticastSpec#onBackpressureBuffer() Sinks.many().multicast().onBackpressureBuffer()}.
* This processor was blocking in {@link EmitterProcessor#onNext(Object)}.
* This behaviour can be implemented with the {@link Sinks} API by calling
* If you really need the subscribe-to-upstream functionality of a {@link org.reactivestreams.Processor}, switch
* to {@link Sinks.ManyWithUpstream} with {@link Sinks#unsafe()} variants of
* {@link Sinks.MulticastUnsafeSpec#onBackpressureBuffer() Sinks.unsafe().many().multicast().onBackpressureBuffer()}.
* <p/>This processor was blocking in {@link EmitterProcessor#onNext(Object)}. This behaviour can be implemented with the {@link Sinks} API by calling
* {@link Sinks.Many#tryEmitNext(Object)} and retrying, e.g.:
* <pre>{@code while (sink.tryEmitNext(v).hasFailed()) {
* LockSupport.parkNanos(10);
* }
* }</pre>
*/
@Deprecated
public final class EmitterProcessor<T> extends FluxProcessor<T, T> implements InternalManySink<T> {
public final class EmitterProcessor<T> extends FluxProcessor<T, T> implements InternalManySink<T>,
Sinks.ManyWithUpstream<T> {

@SuppressWarnings("rawtypes")
static final FluxPublish.PubSubInner[] EMPTY = new FluxPublish.PublishInner[0];
Expand Down Expand Up @@ -150,6 +157,12 @@ public static <E> EmitterProcessor<E> create(int bufferSize, boolean autoCancel)
FluxPublish.PubSubInner[].class,
"subscribers");

volatile EmitterDisposable upstreamDisposable;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<EmitterProcessor, EmitterDisposable> UPSTREAM_DISPOSABLE =
AtomicReferenceFieldUpdater.newUpdater(EmitterProcessor.class, EmitterDisposable.class, "upstreamDisposable");


@SuppressWarnings("unused")
volatile int wip;

Expand Down Expand Up @@ -192,6 +205,39 @@ public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
}


private boolean isDetached() {
return s == Operators.cancelledSubscription() && done && error instanceof CancellationException;
}

private boolean detach() {
if (Operators.terminate(S, this)) {
done = true;
CancellationException detachException = new CancellationException("the ManyWithUpstream sink had a Subscription to an upstream which has been manually cancelled");
if (ERROR.compareAndSet(EmitterProcessor.this, null, detachException)) {
Queue<T> q = queue;
if (q != null) {
q.clear();
}
for (FluxPublish.PubSubInner<T> inner : terminate()) {
inner.actual.onError(detachException);
}
return true;
}
}
return false;
}

@Override
public Disposable subscribeTo(Publisher<? extends T> upstream) {
EmitterDisposable ed = new EmitterDisposable(this);
if (UPSTREAM_DISPOSABLE.compareAndSet(this, null, ed)) {
upstream.subscribe(this);
return ed;
}
throw new IllegalStateException("A Sinks.ManyWithUpstream must be subscribed to a source only once");
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Objects.requireNonNull(actual, "subscribe");
Expand Down Expand Up @@ -332,6 +378,8 @@ public boolean isDisposed() {

@Override
public void onSubscribe(final Subscription s) {
//since the CoreSubscriber nature isn't exposed to the user, the only path to onSubscribe is
//already guarded by UPSTREAM_DISPOSABLE. just in case the publisher misbehaves we still use setOnce
if (Operators.setOnce(S, this, s)) {
if (s instanceof Fuseable.QueueSubscription) {
@SuppressWarnings("unchecked") Fuseable.QueueSubscription<T> f =
Expand Down Expand Up @@ -647,5 +695,31 @@ void removeAndDrainParent() {
}
}

static final class EmitterDisposable implements Disposable {

@Nullable
EmitterProcessor<?> target;

public EmitterDisposable(EmitterProcessor<?> emitterProcessor) {
this.target = emitterProcessor;
}

@Override
public boolean isDisposed() {
return target == null || target.isDetached();
}

@Override
public void dispose() {
EmitterProcessor<?> t = target;
if (t == null) {
return;
}
if (t.detach() || t.isDetached()) {
target = null;
}
}
}


}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@
* @param <IN> the input value type
* @param <OUT> the output value type
* @deprecated Processors will be removed in 3.5. Prefer using {@link Sinks.Many} instead,
* * or see https://github.com/reactor/reactor-core/issues/2431 for alternatives
* or see https://github.com/reactor/reactor-core/issues/2431 for alternatives
*/
@Deprecated
public abstract class FluxProcessor<IN, OUT> extends Flux<OUT>
Expand Down
103 changes: 97 additions & 6 deletions reactor-core/src/main/java/reactor/core/publisher/Sinks.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import java.time.Duration;
import java.util.Queue;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
Expand Down Expand Up @@ -61,7 +64,7 @@ private Sinks() {
* @see RootSpec#empty()
*/
public static <T> Sinks.Empty<T> empty() {
return SinksSpecs.DEFAULT_ROOT_SPEC.empty();
return SinksSpecs.DEFAULT_SINKS.empty();
}

/**
Expand All @@ -76,7 +79,7 @@ public static <T> Sinks.Empty<T> empty() {
* @see RootSpec#one()
*/
public static <T> Sinks.One<T> one() {
return SinksSpecs.DEFAULT_ROOT_SPEC.one();
return SinksSpecs.DEFAULT_SINKS.one();
}

/**
Expand All @@ -88,20 +91,20 @@ public static <T> Sinks.One<T> one() {
* @see RootSpec#many()
*/
public static ManySpec many() {
return SinksSpecs.DEFAULT_ROOT_SPEC.many();
return SinksSpecs.DEFAULT_SINKS.many();
}

/**
* Return a {@link RootSpec root spec} for more advanced use cases such as building operators.
* Return a {@link UnsafeSpec root spec} for more advanced use cases such as building operators.
* Unsafe {@link Sinks.Many}, {@link Sinks.One} and {@link Sinks.Empty} are not serialized nor thread safe,
* which implies they MUST be externally synchronized so as to respect the Reactive Streams specification.
* This can typically be the case when the sinks are being called from within a Reactive Streams-compliant context,
* like a {@link Subscriber} or an operator. In turn, this allows the sinks to have less overhead, since they
* don't care to detect concurrent access anymore.
*
* @return {@link RootSpec}
* @return {@link UnsafeSpec}
*/
public static RootSpec unsafe() {
public static UnsafeSpec unsafe() {
return SinksSpecs.UNSAFE_ROOT_SPEC;
}

Expand Down Expand Up @@ -327,6 +330,39 @@ public interface RootSpec {
ManySpec many();
}

/**
* Provides a choice of {@link Sinks.One}/{@link Sinks.Empty} factories and
* {@link Sinks.ManySpec further specs} for {@link Sinks.Many}, but without the
* guards against concurrent access. These raw sinks need more advanced understanding
* of the Reactive Streams specification in order to be correctly used.
* <p>
* Some flavors of {@link Sinks.Many} are {@link ManyWithUpstream} which additionally
* support being subscribed to an upstream {@link Publisher}, at most once.
* Please note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal
* creation to be the sole responsibility of the upstream {@link Publisher}.
* The list of such flavors is as follows:
* <ul>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer() onBackpressureBuffer()}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int) onBackpressureBuffer(int)}
* </li>
* <li>
* {@link #many()}.{@link ManyUnsafeSpec#multicast() multicast()}.{@link MulticastUnsafeSpec#onBackpressureBuffer(int, boolean) onBackpressureBuffer(int, boolean)}
* </li>
* </ul>
*/
public interface UnsafeSpec extends RootSpec {

/**
* {@inheritDoc}
* <p>
* Some flavors return a {@link ManyWithUpstream}, supporting being subscribed to an upstream {@link Publisher}.
*/
@Override
ManyUnsafeSpec many();
}
/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements
*/
Expand Down Expand Up @@ -354,6 +390,34 @@ public interface ManySpec {
MulticastReplaySpec replay();
}

/**
* Provides multicast : 1 sink, N {@link Subscriber}.
* <p>
* This {@link MulticastSpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
*/
public interface MulticastUnsafeSpec extends MulticastSpec {

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer();

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize);

@Override
<T> ManyWithUpstream<T> onBackpressureBuffer(int bufferSize, boolean autoCancel);
}

/**
* Provides {@link Sinks.Many} specs for sinks which can emit multiple elements.
* <p>
* This {@link ManySpec} provides {@link Sinks#unsafe() unsafe} flavors, including some {@link ManyWithUpstream} implementations.
*/
public interface ManyUnsafeSpec extends ManySpec {

@Override
MulticastUnsafeSpec multicast();
}

/**
* Provides unicast: 1 sink, 1 {@link Subscriber}
*/
Expand Down Expand Up @@ -892,6 +956,33 @@ public interface Many<T> extends Scannable {
Flux<T> asFlux();
}

/**
* A {@link Sinks.Many} which additionally allows being subscribed to an upstream {@link Publisher},
* which is an advanced pattern requiring external synchronization. See {@link #subscribeTo(Publisher)}} for more details.
*
* @param <T> the type of data emitted by the sink
*/
public interface ManyWithUpstream<T> extends Many<T> {

/**
* Explicitly subscribe this {@link Sinks.Many} to an upstream {@link Publisher} without
* exposing it as a {@link Subscriber} at all.
* <p>
* Note that when this is done, one MUST stop using emit/tryEmit APIs, reserving signal
* creation to be the sole responsibility of the upstream {@link Publisher}.
* <p>
* The returned {@link Disposable} provides a way of both unsubscribing from the upstream
* and terminating the sink: currently registered subscribers downstream receive an {@link Subscriber#onError(Throwable) onError}
* signal with a {@link java.util.concurrent.CancellationException} and further attempts at subscribing
* to the sink will trigger a similar signal immediately (in which case the returned {@link Disposable} might be no-op).
* <p>
* Any attempt at subscribing the same {@link ManyWithUpstream} multiple times throws an {@link IllegalStateException}
* indicating that the subscription must be unique.
*/
Disposable subscribeTo(Publisher<? extends T> upstream);

}

/**
* A base interface for standalone {@link Sinks} with complete-or-fail semantics.
* <p>
Expand Down
Loading

0 comments on commit b07f391

Please sign in to comment.