Skip to content

Commit

Permalink
InternalOperator automatic context propagation (#3625)
Browse files Browse the repository at this point in the history
This change revisits InternalMonoOperator and InternalFluxOperator
implementations and wraps their async sources with a context restoring
Subscriber implementation.

This is a follow-up for #3549
  • Loading branch information
chemicL authored Oct 31, 2023
1 parent d1557e0 commit 69ab66d
Show file tree
Hide file tree
Showing 50 changed files with 797 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public String stepName() {
@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public String stepName() {
@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ final class FluxBufferBoundary<T, U, C extends Collection<? super T>>
Publisher<U> other,
Supplier<C> bufferSupplier) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class FluxBufferWhen<T, OPEN, CLOSE, BUFFER extends Collection<? super T>>
Supplier<BUFFER> bufferSupplier,
Supplier<? extends Queue<BUFFER>> queueSupplier) {
super(source);
this.start = Objects.requireNonNull(start, "start");
this.start = Operators.toFluxOrMono(Objects.requireNonNull(start, "start"));
this.end = Objects.requireNonNull(end, "end");
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
Expand Down Expand Up @@ -360,7 +360,7 @@ void open(OPEN token) {

BufferWhenCloseSubscriber<T, BUFFER> bc = new BufferWhenCloseSubscriber<>(this, idx);
subscribers.add(bc);
p.subscribe(bc);
Operators.toFluxOrMono(p).subscribe(bc);
}

void openComplete(BufferWhenOpenSubscriber<OPEN> os) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -448,7 +448,7 @@ void drain() {
}
else {
active = true;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
}
}
Expand Down Expand Up @@ -805,7 +805,7 @@ void drain() {
}
else {
active = true;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -203,7 +203,7 @@ public void onNext(T t) {
return;
}

p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,7 @@ final class FluxDelaySubscription<T, U> extends InternalFluxOperator<T, T>

FluxDelaySubscription(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -119,8 +119,8 @@ public void onNext(T t) {

Publisher<? extends T> p;
try {
p = Objects.requireNonNull(expander.apply(t),
"The expander returned a null Publisher");
p = Operators.toFluxOrMono(Objects.requireNonNull(expander.apply(t),
"The expander returned a null Publisher"));
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -279,7 +279,7 @@ void drain() {
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,7 +81,7 @@ final class FluxGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R>
Supplier<? extends Queue<Object>> queueSupplier,
Supplier<? extends Queue<TRight>> processorQueueSupplier) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd");
this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd");
this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier");
Expand Down Expand Up @@ -336,7 +336,7 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -404,7 +404,7 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ final class FluxJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends
Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd");
this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd");
this.resultSelector = Objects.requireNonNull(resultSelector, "resultSelector");
Expand Down Expand Up @@ -289,7 +289,7 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -366,7 +366,7 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ public Object scanUnsafe(Attr key) {
@Override
public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,7 @@ public Object scanUnsafe(Attr key) {
@Override
public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand All @@ -65,6 +65,6 @@ public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> act
input = new FluxHide.SuppressFuseableSubscriber<>(input);
}
//otherwise QS is not required or user already made a compatible conversion
return input;
return Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void onNext(T t) {
Publisher<? extends R> publisher;

try {
publisher = Objects.requireNonNull(mapper.apply(t), "publisher");
publisher = Operators.toFluxOrMono(Objects.requireNonNull(mapper.apply(t), "publisher"));
}
catch (Throwable ex) {
onError(Operators.onOperatorError(s, ex, t, actual.currentContext()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,8 +91,8 @@ public void onError(Throwable t) {
Publisher<? extends T> p;

try {
p = Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher");
p = Operators.toFluxOrMono(Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher"));
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,8 +78,9 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> act
queueSupplier,
actual.currentContext());

Publisher<? extends R> out = Objects.requireNonNull(transform.apply(multicast),
"The transform returned a null Publisher");
Publisher<? extends R> out = Operators.toFluxOrMono(Objects.requireNonNull(
transform.apply(multicast),
"The transform returned a null Publisher"));

if (out instanceof Fuseable) {
out.subscribe(new CancelFuseableMulticaster<>(actual, multicast));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

p.subscribe(other);
Operators.toFluxOrMono(p).subscribe(other);

if (!main.cancelled) {
return main;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.reactivestreams.Publisher;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;

Expand Down Expand Up @@ -114,7 +115,9 @@ void resubscribe() {
produced(c);
}

source.subscribe(this);
// Not wrapping source, but just the subscriber due to requirements
// in TailCallSubscribeTest#retry
source.subscribe(Operators.restoreContextOnSubscriberIfPublisherNonInternal(source, this));

} while (WIP.decrementAndGet(this) != 0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ final class FluxSample<T, U> extends InternalFluxOperator<T, T> {

FluxSample(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -191,7 +191,7 @@ public void onNext(T t) {
SampleFirstOther<U> other = new SampleFirstOther<>(this);

if (Operators.replace(OTHER, this, other)) {
p.subscribe(other);
Operators.toFluxOrMono(p).subscribe(other);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,7 +207,7 @@ public void onNext(T t) {
SampleTimeoutOther<T, U> os = new SampleTimeoutOther<>(this, t, idx);

if (Operators.replace(OTHER, this, os)) {
p.subscribe(os);
Operators.toFluxOrMono(p).subscribe(os);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ final class FluxSkipUntilOther<T, U> extends InternalFluxOperator<T, T> {

FluxSkipUntilOther(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Loading

0 comments on commit 69ab66d

Please sign in to comment.