Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore where onLastOperatorHook is applied #3673

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ void open(OPEN token) {

BufferWhenCloseSubscriber<T, BUFFER> bc = new BufferWhenCloseSubscriber<>(this, idx);
subscribers.add(bc);
Operators.toFluxOrMono(p).subscribe(bc);
p = Operators.toFluxOrMono(p);
p.subscribe(bc);
}

void openComplete(BufferWhenOpenSubscriber<OPEN> os) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
if (p == null) {
Operators.error(actual, new NullPointerException("The single source Publisher is null"));
} else {
Operators.toFluxOrMono(p).subscribe(actual);
p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
}
Expand Down Expand Up @@ -255,7 +256,8 @@ public void onComplete() {
if (this.cancelled) {
return;
}
Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down Expand Up @@ -404,7 +406,7 @@ public void onComplete() {
return;
}

final Publisher<? extends T> p = a[i];
Publisher<? extends T> p = a[i];

if (p == null) {
this.remove();
Expand Down Expand Up @@ -440,7 +442,8 @@ public void onComplete() {
return;
}

Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

final Object state = this.get();
if (state != DONE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public void onComplete() {
produced(c);
}

Operators.toFluxOrMono(p).subscribe(this);
p = Operators.toFluxOrMono(p);
p.subscribe(this);

if (isCancelled()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ void drain() {
}
else {
active = true;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
}
Expand Down Expand Up @@ -805,7 +806,8 @@ void drain() {
}
else {
active = true;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public void onNext(T t) {
return;
}

Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ void drain() {
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
Operators.toFluxOrMono(p).subscribe(inner);
p = Operators.toFluxOrMono(p);
p.subscribe(inner);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
new NullPointerException("The single source Publisher is null"));
}
else {
Operators.toFluxOrMono(p).subscribe(actual);
p = Operators.toFluxOrMono(p);
p.subscribe(actual);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ void subscribe(Publisher<? extends T>[] sources,

actual.onSubscribe(this);

Operators.toFluxOrMono(sources);

for (int i = 0; i < n; i++) {
if (cancelled || winner != Integer.MIN_VALUE) {
return;
Expand All @@ -237,7 +239,7 @@ void subscribe(Publisher<? extends T>[] sources,
return;
}

Operators.toFluxOrMono(sources[i]).subscribe(subscribers[i]);
sources[i].subscribe(subscribers[i]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -404,7 +405,8 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -289,7 +288,8 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -366,7 +366,8 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

Operators.toFluxOrMono(p).subscribe(end);
p = Operators.toFluxOrMono(p);
p.subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);

if (!main.cancelled) {
return main;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ static <T> void subscribe(CoreSubscriber<? super T> s,
return;
}

Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);

if (!main.cancelled) {
wrapped.subscribe(main);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ public void onNext(T t) {
SampleFirstOther<U> other = new SampleFirstOther<>(this);

if (Operators.replace(OTHER, this, other)) {
Operators.toFluxOrMono(p).subscribe(other);
p = Operators.toFluxOrMono(p);
p.subscribe(other);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ public void onNext(T t) {
SampleTimeoutOther<T, U> os = new SampleTimeoutOther<>(this, t, idx);

if (Operators.replace(OTHER, this, os)) {
Operators.toFluxOrMono(p).subscribe(os);
p = Operators.toFluxOrMono(p);
p.subscribe(os);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public void onNext(T t) {

if (INNER.compareAndSet(this, si, innerSubscriber)) {
ACTIVE.getAndIncrement(this);
Operators.toFluxOrMono(p).subscribe(innerSubscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(innerSubscriber);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ void subscribeInner(T nextElement, SwitchMapInner<T, R> nextInner, int nextIndex
return;
}

Operators.toFluxOrMono(p).subscribe(nextInner);
p = Operators.toFluxOrMono(p);
p.subscribe(nextInner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public final void onNext(T t) {
return;
}

final Publisher<? extends R> outboundPublisher;
Publisher<? extends R> outboundPublisher;
final SwitchOnFirstControlSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -542,7 +542,8 @@ public final void onNext(T t) {
return;
}

Operators.toFluxOrMono(outboundPublisher).subscribe(o);
outboundPublisher = Operators.toFluxOrMono(outboundPublisher);
outboundPublisher.subscribe(o);
return;
}

Expand Down Expand Up @@ -575,7 +576,7 @@ public final void onError(Throwable t) {
}

if (!hasFirstValueReceived(previousState)) {
final Publisher<? extends R> result;
Publisher<? extends R> result;
final CoreSubscriber<? super R> o = this.outboundSubscriber;
try {
final Signal<T> signal = Signal.error(t, o.currentContext());
Expand All @@ -586,7 +587,8 @@ public final void onError(Throwable t) {
return;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
}
}

Expand All @@ -611,7 +613,7 @@ public final void onComplete() {
}

if (!hasFirstValueReceived(previousState)) {
final Publisher<? extends R> result;
Publisher<? extends R> result;
final CoreSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -623,7 +625,8 @@ public final void onComplete() {
return;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
}
}

Expand Down Expand Up @@ -844,7 +847,7 @@ public boolean tryOnNext(T t) {
return true;
}

final Publisher<? extends R> result;
Publisher<? extends R> result;
final SwitchOnFirstControlSubscriber<? super R> o = this.outboundSubscriber;

try {
Expand All @@ -868,7 +871,8 @@ public boolean tryOnNext(T t) {
return true;
}

Operators.toFluxOrMono(result).subscribe(o);
result = Operators.toFluxOrMono(result);
result.subscribe(o);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -91,7 +90,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
asyncCancel,
null);

Operators.toFluxOrMono(p).subscribe(subscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(subscriber);
}
}
catch (Throwable e) {
Expand All @@ -101,8 +101,9 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

//trigger the resource creation and delay the subscription to actual
Operators.toFluxOrMono(resourceSupplier).subscribe(new ResourceSubscriber(actual,
resourceClosure, asyncComplete, asyncError, asyncCancel, resourceSupplier instanceof Mono));
// + ensure onLastOperatorHook is called by invoking Publisher::subscribe(Subscriber)
((Publisher<?>) Operators.toFluxOrMono(resourceSupplier)).subscribe(
new ResourceSubscriber(actual, resourceClosure, asyncComplete, asyncError, asyncCancel, resourceSupplier instanceof Mono));
}

@Override
Expand Down Expand Up @@ -191,9 +192,10 @@ public void onNext(S resource) {
}
resourceProvided = true;

final Publisher<? extends T> p = deriveFluxFromResource(resource, resourceClosure);
Publisher<? extends T> p = deriveFluxFromResource(resource, resourceClosure);

Operators.toFluxOrMono(p).subscribe(FluxUsingWhen.<S, T>prepareSubscriberForResource(resource,
p = Operators.toFluxOrMono(p);
p.subscribe(FluxUsingWhen.<S, T>prepareSubscriberForResource(resource,
this.actual,
this.asyncComplete,
this.asyncError,
Expand Down Expand Up @@ -362,7 +364,8 @@ public void onError(Throwable t) {
return;
}

Operators.toFluxOrMono(p).subscribe(new RollbackInner(this, t));
p = Operators.toFluxOrMono(p);
p.subscribe(new RollbackInner(this, t));
}
}

Expand All @@ -382,7 +385,8 @@ public void onComplete() {
return;
}

Operators.toFluxOrMono(p).subscribe(new CommitInner(this));
p = Operators.toFluxOrMono(p);
p.subscribe(new CommitInner(this));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void subscribeNextTrigger() {
final Function<? super T, ? extends Publisher<?>> generator =
this.otherGenerators[this.index];

final Publisher<?> p;
Publisher<?> p;

try {
p = generator.apply(this.value);
Expand All @@ -304,7 +304,8 @@ void subscribeNextTrigger() {
this.triggerSubscriber = triggerSubscriber;
}

Operators.toFluxOrMono(p).subscribe(triggerSubscriber);
p = Operators.toFluxOrMono(p);
p.subscribe(triggerSubscriber);
}

@Override
Expand Down
Loading
Loading