Skip to content

Commit

Permalink
Restore where onLastOperatorHook is applied (#3673)
Browse files Browse the repository at this point in the history
Recent improvements in the automatic context propagation (#3549)
resulted in a regression – some sites where the "last operator" hook was
previously applied no longer saw that behaviour. This change restores
it.

Implementation wise, it's worth noting that the "last operator"
functionality relies on executing the subscribe(Subscriber) method from
the base reactive-streams Publisher instead of the overloads that come
from CorePublisher. The implementations of the reactive-streams base
method in reactor-core apply this hook and that is when something is
considered a "last operator". The wrapping of the Publisher when a
non-internal producer is encountered to restore ThreadLocal values has
changed the compiler's inference of the signature to use the
CoreSubscriber argument variant, breaking the behaviour.

This commit does not bring any tests as the functionality was not
extensively tested before. The issue was discovered in spring-security
in spring-projects/spring-security#14207 and
the change has been validated against the actual use case.
  • Loading branch information
chemicL authored Jan 2, 2024
1 parent 4e6f40e commit 378543d
Show file tree
Hide file tree
Showing 24 changed files with 85 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ void open(OPEN token) {

BufferWhenCloseSubscriber<T, BUFFER> bc = new BufferWhenCloseSubscriber<>(this, idx);
subscribers.add(bc);
Operators.toFluxOrMono(p).subscribe(bc);
p = Operators.toFluxOrMono(p);
p.subscribe(bc);
}

void openComplete(BufferWhenOpenSubscriber<OPEN> os) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (p == null) {
Operators.error(actual, new NullPointerException("The single source Publisher is null"));
} else {
Operators.toFluxOrMono(p).subscribe(actual);
p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
}
Expand Down Expand Up @@ -255,7 +256,8 @@ public void onComplete() {
if (this.cancelled) {
return;
}
Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down Expand Up @@ -404,7 +406,7 @@ public void onComplete() {
return;
}

final Publisher<? extends T> p = a[i];
Publisher<? extends T> p = a[i];

if (p == null) {
this.remove();
Expand Down Expand Up @@ -440,7 +442,8 @@ public void onComplete() {
return;
}

Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void onComplete() {
produced(c);
}

Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

if (isCancelled()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ void drain() {
}
else {
active = true;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
}
Expand Down Expand Up @@ -805,7 +806,8 @@ void drain() {
}
else {
active = true;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public void onNext(T t) {
return;
}

Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ void drain() {
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
new NullPointerException("The single source Publisher is null"));
}
else {
Operators.toFluxOrMono(p).subscribe(actual);
p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ void subscribe(Publisher<? extends T>[] sources,

actual.onSubscribe(this);

Operators.toFluxOrMono(sources);

for (int i = 0; i < n; i++) {
if (cancelled || winner != Integer.MIN_VALUE) {
return;
Expand All @@ -237,7 +239,7 @@ void subscribe(Publisher<? extends T>[] sources,
return;
}

Operators.toFluxOrMono(sources[i]).subscribe(subscribers[i]);
sources[i].subscribe(subscribers[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -404,7 +405,8 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -289,7 +288,8 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -366,7 +366,8 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);

if (!main.cancelled) {
return main;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ static <T> void subscribe(CoreSubscriber<? super T> s,
return;
}

Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);

if (!main.cancelled) {
wrapped.subscribe(main);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public void onNext(T t) {
SampleFirstOther<U> other = new SampleFirstOther<>(this);

if (Operators.replace(OTHER, this, other)) {
Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public void onNext(T t) {
SampleTimeoutOther<T, U> os = new SampleTimeoutOther<>(this, t, idx);

if (Operators.replace(OTHER, this, os)) {
Operators.toFluxOrMono(p).subscribe(os);
p = Operators.toFluxOrMono(p);
p.subscribe(os);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public void onNext(T t) {

if (INNER.compareAndSet(this, si, innerSubscriber)) {
ACTIVE.getAndIncrement(this);
Operators.toFluxOrMono(p).subscribe(innerSubscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(innerSubscriber);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ void subscribeInner(T nextElement, SwitchMapInner<T, R> nextInner, int nextIndex
return;
}

Operators.toFluxOrMono(p).subscribe(nextInner);
p = Operators.toFluxOrMono(p);
p.subscribe(nextInner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public final void onNext(T t) {
return;
}

final Publisher<? extends R> outboundPublisher;
Publisher<? extends R> outboundPublisher;
final SwitchOnFirstControlSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -542,7 +542,8 @@ public final void onNext(T t) {
return;
}

Operators.toFluxOrMono(outboundPublisher).subscribe(o);
outboundPublisher = Operators.toFluxOrMono(outboundPublisher);
outboundPublisher.subscribe(o);
return;
}

Expand Down Expand Up @@ -575,7 +576,7 @@ public final void onError(Throwable t) {
}

if (!hasFirstValueReceived(previousState)) {
final Publisher<? extends R> result;
Publisher<? extends R> result;
final CoreSubscriber<? super R> o = this.outboundSubscriber;
try {
final Signal<T> signal = Signal.error(t, o.currentContext());
Expand All @@ -586,7 +587,8 @@ public final void onError(Throwable t) {
return;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
}
}

Expand All @@ -611,7 +613,7 @@ public final void onComplete() {
}

if (!hasFirstValueReceived(previousState)) {
final Publisher<? extends R> result;
Publisher<? extends R> result;
final CoreSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -623,7 +625,8 @@ public final void onComplete() {
return;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
}
}

Expand Down Expand Up @@ -844,7 +847,7 @@ public boolean tryOnNext(T t) {
return true;
}

final Publisher<? extends R> result;
Publisher<? extends R> result;
final SwitchOnFirstControlSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -868,7 +871,8 @@ public boolean tryOnNext(T t) {
return true;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -91,7 +90,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
asyncCancel,
null);

Operators.toFluxOrMono(p).subscribe(subscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(subscriber);
}
}
catch (Throwable e) {
Expand All @@ -101,8 +101,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

//trigger the resource creation and delay the subscription to actual
Operators.toFluxOrMono(resourceSupplier).subscribe(new ResourceSubscriber(actual,
resourceClosure, asyncComplete, asyncError, asyncCancel, resourceSupplier instanceof Mono));
// + ensure onLastOperatorHook is called by invoking Publisher::subscribe(Subscriber)
((Publisher<?>) Operators.toFluxOrMono(resourceSupplier)).subscribe(
new ResourceSubscriber(actual, resourceClosure, asyncComplete, asyncError, asyncCancel, resourceSupplier instanceof Mono));
}

@Override
Expand Down Expand Up @@ -191,9 +192,10 @@ public void onNext(S resource) {
}
resourceProvided = true;

final Publisher<? extends T> p = deriveFluxFromResource(resource, resourceClosure);
Publisher<? extends T> p = deriveFluxFromResource(resource, resourceClosure);

Operators.toFluxOrMono(p).subscribe(FluxUsingWhen.<S, T>prepareSubscriberForResource(resource,
p = Operators.toFluxOrMono(p);
p.subscribe(FluxUsingWhen.<S, T>prepareSubscriberForResource(resource,
this.actual,
this.asyncComplete,
this.asyncError,
Expand Down Expand Up @@ -362,7 +364,8 @@ public void onError(Throwable t) {
return;
}

Operators.toFluxOrMono(p).subscribe(new RollbackInner(this, t));
p = Operators.toFluxOrMono(p);
p.subscribe(new RollbackInner(this, t));
}
}

Expand All @@ -382,7 +385,8 @@ public void onComplete() {
return;
}

Operators.toFluxOrMono(p).subscribe(new CommitInner(this));
p = Operators.toFluxOrMono(p);
p.subscribe(new CommitInner(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void subscribeNextTrigger() {
final Function<? super T, ? extends Publisher<?>> generator =
this.otherGenerators[this.index];

final Publisher<?> p;
Publisher<?> p;

try {
p = generator.apply(this.value);
Expand All @@ -304,7 +304,8 @@ void subscribeNextTrigger() {
this.triggerSubscriber = triggerSubscriber;
}

Operators.toFluxOrMono(p).subscribe(triggerSubscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(triggerSubscriber);
}

@Override
Expand Down
Loading

0 comments on commit 378543d

Please sign in to comment.