Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InternalOperator automatic context propagation #3625

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public String stepName() {
@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public String stepName() {
@Override
public final CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ final class FluxBufferBoundary<T, U, C extends Collection<? super T>>
Publisher<U> other,
Supplier<C> bufferSupplier) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ final class FluxBufferWhen<T, OPEN, CLOSE, BUFFER extends Collection<? super T>>
Supplier<BUFFER> bufferSupplier,
Supplier<? extends Queue<BUFFER>> queueSupplier) {
super(source);
this.start = Objects.requireNonNull(start, "start");
this.start = Operators.toFluxOrMono(Objects.requireNonNull(start, "start"));
this.end = Objects.requireNonNull(end, "end");
this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier");
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
Expand Down Expand Up @@ -360,7 +360,7 @@ void open(OPEN token) {

BufferWhenCloseSubscriber<T, BUFFER> bc = new BufferWhenCloseSubscriber<>(this, idx);
subscribers.add(bc);
p.subscribe(bc);
Operators.toFluxOrMono(p).subscribe(bc);
}

void openComplete(BufferWhenOpenSubscriber<OPEN> os) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -448,7 +448,7 @@ void drain() {
}
else {
active = true;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
}
}
Expand Down Expand Up @@ -805,7 +805,7 @@ void drain() {
}
else {
active = true;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -203,7 +203,7 @@ public void onNext(T t) {
return;
}

p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,7 +40,7 @@ final class FluxDelaySubscription<T, U> extends InternalFluxOperator<T, T>

FluxDelaySubscription(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -119,8 +119,8 @@ public void onNext(T t) {

Publisher<? extends T> p;
try {
p = Objects.requireNonNull(expander.apply(t),
"The expander returned a null Publisher");
p = Operators.toFluxOrMono(Objects.requireNonNull(expander.apply(t),
"The expander returned a null Publisher"));
}
catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -279,7 +279,7 @@ void drain() {
FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono));
if (CURRENT.compareAndSet(this,null, inner)) {
state = STATE_RUNNING;
p.subscribe(inner);
Operators.toFluxOrMono(p).subscribe(inner);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,7 +81,7 @@ final class FluxGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R>
Supplier<? extends Queue<Object>> queueSupplier,
Supplier<? extends Queue<TRight>> processorQueueSupplier) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd");
this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd");
this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier");
Expand Down Expand Up @@ -336,7 +336,7 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -404,7 +404,7 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ final class FluxJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends
Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd,
BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd");
this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd");
this.resultSelector = Objects.requireNonNull(resultSelector, "resultSelector");
Expand Down Expand Up @@ -289,7 +289,7 @@ void drain() {
new LeftRightEndSubscriber(this, true, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down Expand Up @@ -366,7 +366,7 @@ else if (mode == RIGHT_VALUE) {
new LeftRightEndSubscriber(this, false, idx);
cancellations.add(end);

p.subscribe(end);
Operators.toFluxOrMono(p).subscribe(end);

ex = error;
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@ public Object scanUnsafe(Attr key) {
@Override
public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,7 @@ public Object scanUnsafe(Attr key) {
@Override
public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> actual) {
CoreSubscriber<? super I> input =
liftFunction.lifter.apply(source, actual);
liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual));

Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null");

Expand All @@ -65,6 +65,6 @@ public CoreSubscriber<? super I> subscribeOrReturn(CoreSubscriber<? super O> act
input = new FluxHide.SuppressFuseableSubscriber<>(input);
}
//otherwise QS is not required or user already made a compatible conversion
return input;
return Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void onNext(T t) {
Publisher<? extends R> publisher;

try {
publisher = Objects.requireNonNull(mapper.apply(t), "publisher");
publisher = Operators.toFluxOrMono(Objects.requireNonNull(mapper.apply(t), "publisher"));
}
catch (Throwable ex) {
onError(Operators.onOperatorError(s, ex, t, actual.currentContext()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,8 +91,8 @@ public void onError(Throwable t) {
Publisher<? extends T> p;

try {
p = Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher");
p = Operators.toFluxOrMono(Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher"));
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,8 +78,9 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> act
queueSupplier,
actual.currentContext());

Publisher<? extends R> out = Objects.requireNonNull(transform.apply(multicast),
"The transform returned a null Publisher");
Publisher<? extends R> out = Operators.toFluxOrMono(Objects.requireNonNull(
transform.apply(multicast),
"The transform returned a null Publisher"));

if (out instanceof Fuseable) {
out.subscribe(new CancelFuseableMulticaster<>(actual, multicast));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> act
return null;
}

p.subscribe(other);
Operators.toFluxOrMono(p).subscribe(other);

if (!main.cancelled) {
return main;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.reactivestreams.Publisher;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;

Expand Down Expand Up @@ -114,7 +115,9 @@ void resubscribe() {
produced(c);
}

source.subscribe(this);
// Not wrapping source, but just the subscriber due to requirements
// in TailCallSubscribeTest#retry
source.subscribe(Operators.restoreContextOnSubscriberIfPublisherNonInternal(source, this));

} while (WIP.decrementAndGet(this) != 0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,7 +52,7 @@ final class FluxSample<T, U> extends InternalFluxOperator<T, T> {

FluxSample(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -191,7 +191,7 @@ public void onNext(T t) {
SampleFirstOther<U> other = new SampleFirstOther<>(this);

if (Operators.replace(OTHER, this, other)) {
p.subscribe(other);
Operators.toFluxOrMono(p).subscribe(other);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,7 +207,7 @@ public void onNext(T t) {
SampleTimeoutOther<T, U> os = new SampleTimeoutOther<>(this, t, idx);

if (Operators.replace(OTHER, this, os)) {
p.subscribe(os);
Operators.toFluxOrMono(p).subscribe(os);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@ final class FluxSkipUntilOther<T, U> extends InternalFluxOperator<T, T> {

FluxSkipUntilOther(Flux<? extends T> source, Publisher<U> other) {
super(source);
this.other = Objects.requireNonNull(other, "other");
this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other"));
}

@Override
Expand Down
Loading
Loading