diff --git a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java index 35d7e86bc7..9523098638 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLift.java @@ -69,7 +69,7 @@ public String stepName() { @Override public final CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java index b73831c3b9..60b748292d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ConnectableLiftFuseable.java @@ -71,7 +71,7 @@ public String stepName() { @Override public final CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferBoundary.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferBoundary.java index 4f81dc94d2..c0c4a04e50 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferBoundary.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferBoundary.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,7 @@ final class FluxBufferBoundary> Publisher other, Supplier bufferSupplier) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java index 8f92516842..b9379073a1 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxBufferWhen.java @@ -65,7 +65,7 @@ final class FluxBufferWhen> Supplier bufferSupplier, Supplier> queueSupplier) { super(source); - this.start = Objects.requireNonNull(start, "start"); + this.start = Operators.toFluxOrMono(Objects.requireNonNull(start, "start")); this.end = Objects.requireNonNull(end, "end"); this.bufferSupplier = Objects.requireNonNull(bufferSupplier, "bufferSupplier"); this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier"); @@ -360,7 +360,7 @@ void open(OPEN token) { BufferWhenCloseSubscriber bc = new BufferWhenCloseSubscriber<>(this, idx); subscribers.add(bc); - p.subscribe(bc); + Operators.toFluxOrMono(p).subscribe(bc); } void openComplete(BufferWhenOpenSubscriber os) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java index 9dde7207dd..b07d8e87e2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMap.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -448,7 +448,7 @@ void drain() { } else { active = true; - p.subscribe(inner); + Operators.toFluxOrMono(p).subscribe(inner); } } } @@ -805,7 +805,7 @@ void drain() { } else { active = true; - p.subscribe(inner); + Operators.toFluxOrMono(p).subscribe(inner); } } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMapNoPrefetch.java b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMapNoPrefetch.java index 222f60fc23..59ba901353 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMapNoPrefetch.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxConcatMapNoPrefetch.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -203,7 +203,7 @@ public void onNext(T t) { return; } - p.subscribe(inner); + Operators.toFluxOrMono(p).subscribe(inner); } catch (Throwable e) { Context ctx = actual.currentContext(); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxDelaySubscription.java b/reactor-core/src/main/java/reactor/core/publisher/FluxDelaySubscription.java index 080e58670d..8a6a2c1df7 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxDelaySubscription.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxDelaySubscription.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ final class FluxDelaySubscription extends InternalFluxOperator FluxDelaySubscription(Flux source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java b/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java index d12f95b370..fe9fea4723 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxExpand.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -119,8 +119,8 @@ public void onNext(T t) { Publisher p; try { - p = Objects.requireNonNull(expander.apply(t), - "The expander returned a null Publisher"); + p = Operators.toFluxOrMono(Objects.requireNonNull(expander.apply(t), + "The expander returned a null Publisher")); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java index a7b57c89b8..3d2bec6dac 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFilterWhen.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -279,7 +279,7 @@ void drain() { FilterWhenInner inner = new FilterWhenInner(this, !(p instanceof Mono)); if (CURRENT.compareAndSet(this,null, inner)) { state = STATE_RUNNING; - p.subscribe(inner); + Operators.toFluxOrMono(p).subscribe(inner); break; } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupJoin.java b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupJoin.java index d1f4b30356..c88650b462 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxGroupJoin.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxGroupJoin.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,7 +81,7 @@ final class FluxGroupJoin Supplier> queueSupplier, Supplier> processorQueueSupplier) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd"); this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd"); this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier"); @@ -336,7 +336,7 @@ void drain() { new LeftRightEndSubscriber(this, true, idx); cancellations.add(end); - p.subscribe(end); + Operators.toFluxOrMono(p).subscribe(end); ex = error; if (ex != null) { @@ -404,7 +404,7 @@ else if (mode == RIGHT_VALUE) { new LeftRightEndSubscriber(this, false, idx); cancellations.add(end); - p.subscribe(end); + Operators.toFluxOrMono(p).subscribe(end); ex = error; if (ex != null) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxJoin.java b/reactor-core/src/main/java/reactor/core/publisher/FluxJoin.java index d85e16bb1f..b01678a6fc 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxJoin.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxJoin.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ final class FluxJoin extends Function> rightEnd, BiFunction resultSelector) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.leftEnd = Objects.requireNonNull(leftEnd, "leftEnd"); this.rightEnd = Objects.requireNonNull(rightEnd, "rightEnd"); this.resultSelector = Objects.requireNonNull(resultSelector, "resultSelector"); @@ -289,7 +289,7 @@ void drain() { new LeftRightEndSubscriber(this, true, idx); cancellations.add(end); - p.subscribe(end); + Operators.toFluxOrMono(p).subscribe(end); ex = error; if (ex != null) { @@ -366,7 +366,7 @@ else if (mode == RIGHT_VALUE) { new LeftRightEndSubscriber(this, false, idx); cancellations.add(end); - p.subscribe(end); + Operators.toFluxOrMono(p).subscribe(end); ex = error; if (ex != null) { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxLift.java b/reactor-core/src/main/java/reactor/core/publisher/FluxLift.java index ce9da76ac2..f127974310 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxLift.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,7 @@ public Object scanUnsafe(Attr key) { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxLiftFuseable.java index f5eff2cf85..a3aedc88eb 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxLiftFuseable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ public Object scanUnsafe(Attr key) { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); @@ -65,6 +65,6 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act input = new FluxHide.SuppressFuseableSubscriber<>(input); } //otherwise QS is not required or user already made a compatible conversion - return input; + return Operators.restoreContextOnSubscriberIfAutoCPEnabled(this, input); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java index 50aa3ce830..46c4899e97 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxMergeSequential.java @@ -205,7 +205,7 @@ public void onNext(T t) { Publisher publisher; try { - publisher = Objects.requireNonNull(mapper.apply(t), "publisher"); + publisher = Operators.toFluxOrMono(Objects.requireNonNull(mapper.apply(t), "publisher")); } catch (Throwable ex) { onError(Operators.onOperatorError(s, ex, t, actual.currentContext())); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxOnErrorResume.java b/reactor-core/src/main/java/reactor/core/publisher/FluxOnErrorResume.java index 28a4e60764..a09c3aa08c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxOnErrorResume.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxOnErrorResume.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,8 +91,8 @@ public void onError(Throwable t) { Publisher p; try { - p = Objects.requireNonNull(nextFactory.apply(t), - "The nextFactory returned a null Publisher"); + p = Operators.toFluxOrMono(Objects.requireNonNull(nextFactory.apply(t), + "The nextFactory returned a null Publisher")); } catch (Throwable e) { Throwable _e = Operators.onOperatorError(e, actual.currentContext()); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java index 6bd338f16a..21ce7db642 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxPublishMulticast.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,8 +78,9 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act queueSupplier, actual.currentContext()); - Publisher out = Objects.requireNonNull(transform.apply(multicast), - "The transform returned a null Publisher"); + Publisher out = Operators.toFluxOrMono(Objects.requireNonNull( + transform.apply(multicast), + "The transform returned a null Publisher")); if (out instanceof Fuseable) { out.subscribe(new CancelFuseableMulticaster<>(actual, multicast)); diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxRepeatWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxRepeatWhen.java index 30726f9ed7..2d240732b6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxRepeatWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxRepeatWhen.java @@ -76,7 +76,7 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } - p.subscribe(other); + Operators.toFluxOrMono(p).subscribe(other); if (!main.cancelled) { return main; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxRetry.java b/reactor-core/src/main/java/reactor/core/publisher/FluxRetry.java index ec96045a55..4efa85dc8c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxRetry.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxRetry.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.reactivestreams.Publisher; import reactor.core.CorePublisher; import reactor.core.CoreSubscriber; @@ -114,7 +115,9 @@ void resubscribe() { produced(c); } - source.subscribe(this); + // Not wrapping source, but just the subscriber due to requirements + // in TailCallSubscribeTest#retry + source.subscribe(Operators.restoreContextOnSubscriberIfPublisherNonInternal(source, this)); } while (WIP.decrementAndGet(this) != 0); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java index 01ffa0fd34..84e8b63fac 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSample.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ final class FluxSample extends InternalFluxOperator { FluxSample(Flux source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSampleFirst.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSampleFirst.java index c346f75002..a3027c86d0 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSampleFirst.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSampleFirst.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -191,7 +191,7 @@ public void onNext(T t) { SampleFirstOther other = new SampleFirstOther<>(this); if (Operators.replace(OTHER, this, other)) { - p.subscribe(other); + Operators.toFluxOrMono(p).subscribe(other); } } else { diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSampleTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSampleTimeout.java index 58d2fcc952..ccd51f7200 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSampleTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSampleTimeout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -207,7 +207,7 @@ public void onNext(T t) { SampleTimeoutOther os = new SampleTimeoutOther<>(this, t, idx); if (Operators.replace(OTHER, this, os)) { - p.subscribe(os); + Operators.toFluxOrMono(p).subscribe(os); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSkipUntilOther.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSkipUntilOther.java index e1865725f2..1aa25886af 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSkipUntilOther.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSkipUntilOther.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ final class FluxSkipUntilOther extends InternalFluxOperator { FluxSkipUntilOther(Flux source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java index da13faedb7..a94a412a3c 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchIfEmpty.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ final class FluxSwitchIfEmpty extends InternalFluxOperator { FluxSwitchIfEmpty(Flux source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java index e703b43b31..39c3f8a7cb 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMap.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -233,7 +233,7 @@ public void onNext(T t) { if (INNER.compareAndSet(this, si, innerSubscriber)) { ACTIVE.getAndIncrement(this); - p.subscribe(innerSubscriber); + Operators.toFluxOrMono(p).subscribe(innerSubscriber); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMapNoPrefetch.java b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMapNoPrefetch.java index 404f86eab5..06620047f8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMapNoPrefetch.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxSwitchMapNoPrefetch.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -215,7 +215,7 @@ void subscribeInner(T nextElement, SwitchMapInner nextInner, int nextIndex return; } - p.subscribe(nextInner); + Operators.toFluxOrMono(p).subscribe(nextInner); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java index 86f7a50644..2c999c18f9 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTakeUntilOther.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ final class FluxTakeUntilOther extends InternalFluxOperator { FluxTakeUntilOther(Flux source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/FluxTimeout.java index d139f5fde2..d1482d4c8b 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxTimeout.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,8 @@ final class FluxTimeout extends InternalFluxOperator { Function> itemTimeout, String timeoutDescription) { super(source); - this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout"); + this.firstTimeout = Operators.toFluxOrMono(Objects.requireNonNull(firstTimeout, + "firstTimeout")); this.itemTimeout = Objects.requireNonNull(itemTimeout, "itemTimeout"); this.other = null; @@ -69,9 +70,9 @@ final class FluxTimeout extends InternalFluxOperator { Function> itemTimeout, Publisher other) { super(source); - this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout"); + this.firstTimeout = Operators.toFluxOrMono(Objects.requireNonNull(firstTimeout, "firstTimeout")); this.itemTimeout = Objects.requireNonNull(itemTimeout, "itemTimeout"); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.timeoutDescription = null; } @@ -199,7 +200,7 @@ public void onNext(T t) { return; } - p.subscribe(ts); + Operators.toFluxOrMono(p).subscribe(ts); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowBoundary.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowBoundary.java index f3a1954c5d..ec05189340 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowBoundary.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowBoundary.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,7 +55,7 @@ final class FluxWindowBoundary extends InternalFluxOperator> { FluxWindowBoundary(Flux source, Publisher other, Supplier> processorQueueSupplier) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier"); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowWhen.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowWhen.java index cc92466e16..16d136d30a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowWhen.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,7 @@ final class FluxWindowWhen extends InternalFluxOperator> { Function> end, Supplier> processorQueueSupplier) { super(source); - this.start = Objects.requireNonNull(start, "start"); + this.start = Operators.toFluxOrMono(Objects.requireNonNull(start, "start")); this.end = Objects.requireNonNull(end, "end"); this.processorQueueSupplier = Objects.requireNonNull(processorQueueSupplier, "processorQueueSupplier"); @@ -308,7 +308,7 @@ void drainLoop() { if (resources.add(cl)) { OPEN_WINDOW_COUNT.getAndIncrement(this); - p.subscribe(cl); + Operators.toFluxOrMono(p).subscribe(cl); } continue; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWithLatestFrom.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWithLatestFrom.java index 8b4b2eb4af..8693622717 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWithLatestFrom.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWithLatestFrom.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,7 +57,7 @@ final class FluxWithLatestFrom extends InternalFluxOperator { Publisher other, BiFunction combiner) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); this.combiner = Objects.requireNonNull(combiner, "combiner"); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java b/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java index a74c73cd50..4f82af4696 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/GroupedLift.java @@ -80,7 +80,7 @@ public String stepName() { @Override public void subscribe(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java index 3b1cbbfa0e..4eaf1f0618 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/GroupedLiftFuseable.java @@ -82,7 +82,7 @@ public String stepName() { @Override public void subscribe(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateWhen.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateWhen.java index 0c9ea70dce..8d03641aca 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCacheInvalidateWhen.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -282,6 +282,9 @@ void cacheLoad(T value) { for (@SuppressWarnings("unchecked") CacheMonoSubscriber inner : SUBSCRIBERS.getAndSet(this, COORDINATOR_DONE)) { inner.complete(value); } + // even though the trigger can deliver values on different threads, + // it's not causing any delivery to downstream, so we don't need to + // wrap it invalidateTrigger.subscribe(new TriggerSubscriber(this.main)); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDelaySubscription.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDelaySubscription.java index 09697964f9..461b8d62b2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoDelaySubscription.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoDelaySubscription.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ final class MonoDelaySubscription extends InternalMonoOperator MonoDelaySubscription(Mono source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoFilterWhen.java b/reactor-core/src/main/java/reactor/core/publisher/MonoFilterWhen.java index 7865886443..093c54c25a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoFilterWhen.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoFilterWhen.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -143,7 +143,7 @@ public void onNext(T t) { } else { FilterWhenInner inner = new FilterWhenInner<>(this, !(p instanceof Mono), t); - p.subscribe(inner); + Operators.toFluxOrMono(p).subscribe(inner); } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMap.java b/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMap.java index 8d30a5d688..9f85b847c3 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMap.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMap.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,7 +162,7 @@ public void onNext(T t) { } try { - m.subscribe(new FlatMapInner<>(this)); + Mono.fromDirect(m).subscribe(new FlatMapInner<>(this)); } catch (Throwable e) { actual.onError(Operators.onOperatorError(this, e, t, diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMapMany.java b/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMapMany.java index fa671d6c40..08d6d1dd9a 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMapMany.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoFlatMapMany.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -192,7 +192,7 @@ public void onNext(T t) { return; } - p.subscribe(new FlatMapManyInner<>(this, actual)); + Operators.toFluxOrMono(p).subscribe(new FlatMapManyInner<>(this, actual)); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java b/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java index b47b672131..697804dbf4 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoLift.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ final class MonoLift extends InternalMonoOperator { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { CoreSubscriber input = - liftFunction.lifter.apply(source, actual); + liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java index 7bc535a244..93da526312 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoLiftFuseable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,8 +55,7 @@ public Object scanUnsafe(Attr key) { @Override public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { - - CoreSubscriber input = liftFunction.lifter.apply(source, actual); + CoreSubscriber input = liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)); Objects.requireNonNull(input, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoPublishMulticast.java b/reactor-core/src/main/java/reactor/core/publisher/MonoPublishMulticast.java index 02f471eae8..ccf5999e04 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoPublishMulticast.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoPublishMulticast.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,8 +53,8 @@ final class MonoPublishMulticast extends InternalMonoOperator implem public CoreSubscriber subscribeOrReturn(CoreSubscriber actual) { MonoPublishMulticaster multicast = new MonoPublishMulticaster<>(actual.currentContext()); - Mono out = Objects.requireNonNull(transform.apply(fromDirect(multicast)), - "The transform returned a null Mono"); + Mono out = fromDirect( + Objects.requireNonNull(transform.apply(fromDirect(multicast)), "The transform returned a null Mono")); if (out instanceof Fuseable) { out.subscribe(new FluxPublishMulticast.CancelFuseableMulticaster<>(actual, multicast)); diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSwitchIfEmpty.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSwitchIfEmpty.java index 42eb454489..e485f220da 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoSwitchIfEmpty.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSwitchIfEmpty.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ final class MonoSwitchIfEmpty extends InternalMonoOperator { MonoSwitchIfEmpty(Mono source, Mono other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = fromDirect(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoTakeUntilOther.java b/reactor-core/src/main/java/reactor/core/publisher/MonoTakeUntilOther.java index 5962d7ef34..0e3a448944 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoTakeUntilOther.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoTakeUntilOther.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ final class MonoTakeUntilOther extends InternalMonoOperator { MonoTakeUntilOther(Mono source, Publisher other) { super(source); - this.other = Objects.requireNonNull(other, "other"); + this.other = Operators.toFluxOrMono(Objects.requireNonNull(other, "other")); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoTimeout.java b/reactor-core/src/main/java/reactor/core/publisher/MonoTimeout.java index 24ccfe8912..97d0569b4d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoTimeout.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoTimeout.java @@ -48,7 +48,7 @@ final class MonoTimeout extends InternalMonoOperator { Publisher firstTimeout, String timeoutDescription) { super(source); - this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout"); + this.firstTimeout = Mono.fromDirect(Objects.requireNonNull(firstTimeout, "firstTimeout")); this.other = null; this.timeoutDescription = timeoutDescription; } @@ -57,8 +57,8 @@ final class MonoTimeout extends InternalMonoOperator { Publisher firstTimeout, Publisher other) { super(source); - this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout"); - this.other = Objects.requireNonNull(other, "other"); + this.firstTimeout = Mono.fromDirect(Objects.requireNonNull(firstTimeout, "firstTimeout")); + this.other = Mono.fromDirect(Objects.requireNonNull(other, "other")); this.timeoutDescription = null; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 3d60285c48..0763de1185 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -2701,7 +2701,7 @@ static final LiftFunction liftScannable( } BiFunction, ? extends CoreSubscriber> - effectiveLifter = (pub, sub) -> lifter.apply(Scannable.from(pub), sub); + effectiveLifter = (pub, sub) -> lifter.apply(Scannable.from(pub), restoreContextOnSubscriberIfAutoCPEnabled(pub, sub)); return new LiftFunction<>(effectiveFilter, effectiveLifter, lifter.toString()); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java index 83fa0b940b..29375a0535 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelLift.java @@ -62,9 +62,9 @@ public Object scanUnsafe(Attr key) { if (key == Attr.LIFTER) { return liftFunction.name; } - // We don't control what the lifter does, so we play it safe. - if (key == InternalProducerAttr.INSTANCE) return false; - + if (key == InternalProducerAttr.INSTANCE) { + return true; + } return null; } @@ -83,13 +83,9 @@ public void subscribe(CoreSubscriber[] s) { int i = 0; while (i < subscribers.length) { - // As this is not an INTERNAL_PRODUCER, the subscribers should be protected - // in case of automatic context propagation. - // If a user directly subscribes with a set of rails, there is no - // protection against that, so a ThreadLocal restoring subscriber would - // need to be provided. subscribers[i] = - Objects.requireNonNull(liftFunction.lifter.apply(source, s[i]), + Objects.requireNonNull(liftFunction.lifter.apply(source, + Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, s[i])), "Lifted subscriber MUST NOT be null"); i++; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java b/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java index 2d3ae6c28c..c345fbb93d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/ParallelLiftFuseable.java @@ -65,9 +65,9 @@ public Object scanUnsafe(Attr key) { if (key == Attr.LIFTER) { return liftFunction.name; } - // We don't control what the lifter does, so we play it safe. - if (key == InternalProducerAttr.INSTANCE) return false; - + if (key == InternalProducerAttr.INSTANCE) { + return true; + } return null; } @@ -87,13 +87,8 @@ public void subscribe(CoreSubscriber[] s) { int i = 0; while (i < subscribers.length) { CoreSubscriber actual = s[i]; - // As this is not an INTERNAL_PRODUCER, the subscribers should be protected - // in case of automatic context propagation. - // If a user directly subscribes with a set of rails, there is no - // protection against that, so a ThreadLocal restoring subscriber would - // need to be provided. CoreSubscriber converted = - Objects.requireNonNull(liftFunction.lifter.apply(source, actual), + Objects.requireNonNull(liftFunction.lifter.apply(source, Operators.restoreContextOnSubscriberIfAutoCPEnabled(source, actual)), "Lifted subscriber MUST NOT be null"); Objects.requireNonNull(converted, "Lifted subscriber MUST NOT be null"); diff --git a/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java b/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java index c335128644..ccfe47fdc0 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java +++ b/reactor-core/src/main/java/reactor/core/publisher/SignalLogger.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package reactor.core.publisher; +import java.time.Instant; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java index 988bb0aacc..61e45b7057 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java @@ -18,6 +18,7 @@ import java.io.File; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -39,6 +40,7 @@ import java.util.stream.Stream; import io.micrometer.context.ContextRegistry; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -48,6 +50,8 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.CorePublisher; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; @@ -59,6 +63,7 @@ import reactor.util.function.Tuples; import reactor.util.retry.Retry; +import static org.assertj.core.api.Assertions.anyOf; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; @@ -317,7 +322,7 @@ class NonReactorFluxOrMono { @BeforeEach void enableAutomaticContextPropagation() { - executorService = Executors.newFixedThreadPool(3); + executorService = Executors.newSingleThreadExecutor(); } @AfterEach @@ -342,11 +347,9 @@ void assertThreadLocalsPresentInFlux(Supplier> chainSupplier) { void assertThreadLocalsPresentInFlux(Supplier> chainSupplier, boolean skipCoreSubscriber) { assertThreadLocalsPresent(chainSupplier.get()); - assertThatNoException().isThrownBy(() -> - assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get())); + assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get()); if (!skipCoreSubscriber) { - assertThatNoException().isThrownBy(() -> - assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get())); + assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get()); } } @@ -357,11 +360,9 @@ void assertThreadLocalsPresentInMono(Supplier> chainSupplier) { void assertThreadLocalsPresentInMono(Supplier> chainSupplier, boolean skipCoreSubscriber) { assertThreadLocalsPresent(chainSupplier.get()); - assertThatNoException().isThrownBy(() -> - assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get())); + assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get()); if (!skipCoreSubscriber) { - assertThatNoException().isThrownBy(() -> - assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get())); + assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get()); } } @@ -371,26 +372,37 @@ void assertThreadLocalsPresent(Flux chain) { AtomicReference tlInOnError = new AtomicReference<>(); AtomicBoolean hadNext = new AtomicBoolean(false); - AtomicBoolean hadError = new AtomicBoolean(false); + AtomicReference error = new AtomicReference<>(); - chain.doOnEach(signal -> { - if (signal.isOnNext()) { - tlInOnNext.set(REF.get()); - hadNext.set(true); - } else if (signal.isOnError()) { - tlInOnError.set(REF.get()); - hadError.set(true); - } else if (signal.isOnComplete()) { - tlInOnComplete.set(REF.get()); - } - }) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); + try { + chain.doOnEach(signal -> { + if (signal.isOnNext()) { + tlInOnNext.set(REF.get()); + hadNext.set(true); + } + else if (signal.isOnError()) { + tlInOnError.set(REF.get()); + error.set(signal.getThrowable()); + } + else if (signal.isOnComplete()) { + tlInOnComplete.set(REF.get()); + } + }) + .contextWrite(Context.of(KEY, "present")) + .blockLast(Duration.ofMillis(5000)); + } catch (Exception e) { + if (e instanceof IllegalStateException) { + throw e; + } + assertThat(e).satisfiesAnyOf( + exception -> assertThat(exception).isEqualTo(error.get()), + exception -> assertThat(exception).hasCause(error.get())); + } if (hadNext.get()) { assertThat(tlInOnNext.get()).isEqualTo("present"); } - if (hadError.get()) { + if (error.get() != null) { assertThat(tlInOnError.get()).isEqualTo("present"); } else { assertThat(tlInOnComplete.get()).isEqualTo("present"); @@ -406,17 +418,20 @@ void assertThreadLocalsPresent(Mono chain) { AtomicBoolean hadError = new AtomicBoolean(false); chain.doOnEach(signal -> { - if (signal.isOnNext()) { - tlInOnNext.set(REF.get()); - hadNext.set(true); - } else if (signal.isOnError()) { - tlInOnError.set(REF.get()); - hadError.set(true); - } else if (signal.isOnComplete()) { - tlInOnComplete.set(REF.get()); - } - }) + if (signal.isOnNext()) { + tlInOnNext.set(REF.get()); + hadNext.set(true); + } + else if (signal.isOnError()) { + tlInOnError.set(REF.get()); + hadError.set(true); + } + else if (signal.isOnComplete()) { + tlInOnComplete.set(REF.get()); + } + }) .contextWrite(Context.of(KEY, "present")) + .onErrorComplete() .block(); if (hadNext.get()) { @@ -430,42 +445,36 @@ void assertThreadLocalsPresent(Mono chain) { } void assertThatThreadLocalsPresentDirectCoreSubscribe( - CorePublisher source) throws InterruptedException, TimeoutException { + CorePublisher source) { assertThatThreadLocalsPresentDirectCoreSubscribe(source, () -> {}); } void assertThatThreadLocalsPresentDirectCoreSubscribe( - CorePublisher source, Runnable asyncAction) throws InterruptedException, TimeoutException { - AtomicReference valueInOnNext = new AtomicReference<>(); - AtomicReference valueInOnComplete = new AtomicReference<>(); - AtomicReference valueInOnError = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - AtomicBoolean hadNext = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); + CorePublisher source, Runnable asyncAction) { + assertThatNoException().isThrownBy(() -> { + CoreSubscriberWithContext subscriberWithContext = new CoreSubscriberWithContext<>(); - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + source.subscribe(subscriberWithContext); - source.subscribe(subscriberWithContext); + executorService.submit(asyncAction) + .get(100, TimeUnit.MILLISECONDS); - executorService.submit(asyncAction); - - if (!latch.await(100, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("timed out"); - } + if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("timed out"); + } - if (hadNext.get()) { - assertThat(valueInOnNext.get()).isEqualTo("present"); - } - if (error.get() == null) { - assertThat(valueInOnComplete.get()).isEqualTo("present"); - assertThat(complete).isTrue(); - } else { - assertThat(valueInOnError.get()).isEqualTo("present"); - } + if (subscriberWithContext.hadNext.get()) { + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo( + "present"); + } + if (subscriberWithContext.error.get() == null) { + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present"); + assertThat(subscriberWithContext.complete).isTrue(); + } + else { + assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present"); + } + }); } // We force the use of subscribe(Subscriber) override instead of @@ -473,42 +482,35 @@ void assertThatThreadLocalsPresentDirectCoreSubscribe( // are able to wrap the Subscriber and restore ThreadLocal values for the // signals received downstream. void assertThatThreadLocalsPresentDirectRawSubscribe( - Publisher source) throws InterruptedException, TimeoutException { + Publisher source) { assertThatThreadLocalsPresentDirectRawSubscribe(source, () -> {}); } void assertThatThreadLocalsPresentDirectRawSubscribe( - Publisher source, Runnable asyncAction) throws InterruptedException, TimeoutException { - AtomicReference valueInOnNext = new AtomicReference<>(); - AtomicReference valueInOnComplete = new AtomicReference<>(); - AtomicReference valueInOnError = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean hadNext = new AtomicBoolean(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); + Publisher source, Runnable asyncAction) { + assertThatNoException().isThrownBy(() -> { + CoreSubscriberWithContext subscriberWithContext = new CoreSubscriberWithContext<>(); - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + source.subscribe(subscriberWithContext); - source.subscribe(subscriberWithContext); + executorService.submit(asyncAction) + .get(100, TimeUnit.MILLISECONDS); - executorService.submit(asyncAction); - - if (!latch.await(100, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("timed out"); - } + if (!subscriberWithContext.latch.await(500, TimeUnit.MILLISECONDS)) { + throw new TimeoutException("timed out"); + } - if (hadNext.get()) { - assertThat(valueInOnNext.get()).isEqualTo("present"); - } - if (error.get() == null) { - assertThat(valueInOnComplete.get()).isEqualTo("present"); - assertThat(complete).isTrue(); - } else { - assertThat(valueInOnError.get()).isEqualTo("present"); - } + if (subscriberWithContext.hadNext.get()) { + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("present"); + } + if (subscriberWithContext.error.get() == null) { + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("present"); + assertThat(subscriberWithContext.complete).isTrue(); + } + else { + assertThat(subscriberWithContext.valueInOnError.get()).isEqualTo("present"); + } + }); } // Fundamental tests for Flux @@ -528,42 +530,30 @@ void internalFluxFlatMapSubscribe() { @Test void internalFluxSubscribeNoFusion() { assertThreadLocalsPresentInFlux(() -> - Flux.just("hello") - .hide() + threadSwitchingFlux() .flatMap(item -> threadSwitchingFlux())); } @Test void directFluxSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException { - AtomicReference valueInOnNext = new AtomicReference<>(); - AtomicReference valueInOnComplete = new AtomicReference<>(); - AtomicReference valueInOnError = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean hadNext = new AtomicBoolean(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - Flux flux = threadSwitchingFlux(); - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + CoreSubscriberWithContext subscriberWithContext = new CoreSubscriberWithContext<>(); flux.subscribe(subscriberWithContext); - if (!latch.await(100, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); + assertThat(subscriberWithContext.error.get()).isNull(); + assertThat(subscriberWithContext.complete.get()).isTrue(); // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(valueInOnNext.get()).isEqualTo("ref_init"); - assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init"); } // Fundamental tests for Mono @@ -581,36 +571,34 @@ void internalMonoFlatMapSubscribe() { } @Test - void directMonoSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException { - AtomicReference valueInOnNext = new AtomicReference<>(); - AtomicReference valueInOnComplete = new AtomicReference<>(); - AtomicReference valueInOnError = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - AtomicBoolean hadNext = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); + void internalMonoFlatMapSubscribeNoFusion() { + assertThreadLocalsPresentInMono(() -> + Mono.just("hello") + .hide() + .flatMap(item -> threadSwitchingMono())); + } + @Test + void directMonoSubscribeAsCoreSubscriber() throws InterruptedException, TimeoutException { Mono mono = new ThreadSwitchingMono<>("Hello", executorService); CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>( - valueInOnNext, valueInOnComplete, valueInOnError, - error, latch, hadNext, complete); + new CoreSubscriberWithContext<>(); mono.subscribe(subscriberWithContext); - if (!latch.await(100, TimeUnit.MILLISECONDS)) { + if (!subscriberWithContext.latch.await(100, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); } - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); + assertThat(subscriberWithContext.error.get()).isNull(); + assertThat(subscriberWithContext.complete.get()).isTrue(); // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(valueInOnNext.get()).isEqualTo("ref_init"); - assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(subscriberWithContext.valueInOnComplete.get()).isEqualTo("ref_init"); } // Flux tests @@ -676,6 +664,20 @@ void fluxRetryWhenSwitchingThread() { .retryWhen(Retry.from(f -> threadSwitchingFlux()))); } + @Test + void fluxRepeatWhen() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .repeatWhen(s -> Flux.just(1))); + } + + @Test + void fluxRepeatWhenSwitchingThread() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello") + .repeatWhen(s -> threadSwitchingFlux())); + } + @Test void fluxWindowUntil() { assertThreadLocalsPresentInFlux(() -> @@ -748,9 +750,11 @@ void fluxConcatArray() { @Test void fluxConcatIterable() { - assertThreadLocalsPresentInFlux(() -> + assertThreadLocalsPresent( Flux.concat( Stream.of(Flux.empty(), threadSwitchingFlux()).collect(Collectors.toList()))); + + // Direct subscription } @Test @@ -789,6 +793,343 @@ void fluxZipIterable() { obj -> Tuples.of((String) obj[0], (String) obj[1]))); } + @Test + void fluxBufferBoundary() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").delayElements(Duration.ofMillis(20)) + .buffer(threadSwitchingFlux())); + } + + @Test + void fluxBufferWhen() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello").delayElements(Duration.ofMillis(20)) + .bufferWhen(threadSwitchingFlux(), x -> Flux.empty())); + } + + @Test + void fluxConcatMap() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .concatMap(s -> threadSwitchingFlux(), 1)); + } + + @Test + void fluxConcatMapNoPrefetch() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello").hide() + .concatMap(s -> threadSwitchingFlux())); + } + + @Test + void fluxDelaySubscription() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello") + .delaySubscription(threadSwitchingFlux())); + } + + @Test + void fluxExpand() { + AtomicBoolean done = new AtomicBoolean(false); + // We don't validate direct subscription via CoreSubscriber with Context in + // this case as it can happen that the drain loop is in the main thread + // and won't restore TLs from the Context when contextWrite operator is + // missing along the way in the chain. + assertThreadLocalsPresent( + Flux.just("hello").expand(s -> { + if (done.get()) { + return Flux.empty(); + } else { + done.set(true); + return threadSwitchingFlux(); + } + })); + } + + @Test + void fluxFilterWhen() { + // We don't validate direct subscription via CoreSubscriber with Context in + // this case as it can happen that the drain loop is in the main thread + // and won't restore TLs from the Context when contextWrite operator is + // missing along the way in the chain. + assertThreadLocalsPresent( + Flux.just("hello") + .filterWhen(s -> new ThreadSwitchingFlux<>(Boolean.TRUE, executorService))); + } + + @Test + void fluxGroupJoinFlattened() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello").groupJoin(threadSwitchingFlux(), + l -> Flux.never(), r -> Flux.never(), + (s, f) -> f.map(i -> s)).flatMap(Function.identity())); + } + + @Test + void fluxGroupJoin() { + assertThreadLocalsPresent( + Flux.just("hello").groupJoin(threadSwitchingFlux(), + l -> Flux.never(), r -> Flux.never(), + (s, f) -> f.map(i -> s))); + + // works only with contextWrite because the group is delivered using the + // signal from the left hand side + } + + @Test + void fluxGroupJoinSubscribed() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello").groupJoin(threadSwitchingFlux(), + l -> Flux.never(), r -> Flux.never(), + (s, f) -> f.map(i -> s)) + .flatMap(Function.identity())); + } + + @Disabled("Only contextWrite/contextCapture usages are supported") + @Test + void fluxJustRawSubscribe() { + assertThatNoException().isThrownBy(() -> + assertThatThreadLocalsPresentDirectRawSubscribe(Flux.just("hello")) + ); + } + + @Test + void fluxJoin() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello").join(threadSwitchingFlux(), l -> Flux.never(), + r -> Flux.never(), (s1, s2) -> s1 + s2)); + } + + @Test + void fluxLift() { + assertThreadLocalsPresentInFlux(() -> { + Flux flux = Flux.just("Hello").hide(); + + Publisher lifted = + Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } + + @Override + public void onNext(String s) { + executorService.submit(() -> sub.onNext(s)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } + + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + + @Override + public Context currentContext() { + return sub.currentContext(); + } + }) + .apply(flux); + + return (Flux) lifted; + }); + } + + @Test + void fluxLiftFuseable() { + assertThreadLocalsPresentInFlux(() -> { + Flux flux = Flux.just("Hello"); + + Publisher lifted = + Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } + + @Override + public void onNext(String s) { + executorService.submit(() -> sub.onNext(s)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } + + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + }) + .apply(flux); + + return (Flux) lifted; + }); + } + + @Test + void fluxFlatMapSequential() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .flatMapSequential(s -> threadSwitchingFlux())); + } + + @Test + void fluxOnErrorResume() { + assertThreadLocalsPresentInFlux(() -> + Flux.error(new RuntimeException("Oops")) + .onErrorResume(t -> threadSwitchingFlux())); + } + + @Test + void fluxPublishMulticast() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello") + .publish(s -> threadSwitchingFlux())); + } + + @Test + void fluxSkipUntilOther() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .skipUntilOther(threadSwitchingFlux())); + } + + @Test + void fluxSample() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").concatWith(Flux.never()) + .sample(threadSwitchingFlux())); + } + + @Test + void fluxSampleFirst() { + // We don't validate direct subscription via CoreSubscriber with Context in + // this case as it can happen that the drain loop is in the main thread + // and won't restore TLs from the Context when contextWrite operator is + // missing along the way in the chain. + assertThreadLocalsPresent( + Flux.just("Hello").concatWith(Flux.never()) + .sampleFirst(s -> new ThreadSwitchingFlux<>(new RuntimeException("oops"), executorService))); + } + + @Test + void fluxSampleTimeout() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux().concatWith(Mono.delay(Duration.ofMillis(10)).map(l -> "").concatWith(Mono.empty())) + .sampleTimeout(s -> threadSwitchingFlux())); + } + + @Test + void fluxSwitchIfEmpty() { + assertThreadLocalsPresentInFlux(() -> + Flux.empty() + .switchIfEmpty(threadSwitchingFlux())); + } + + @Test + void fluxSwitchMapNoPrefetch() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .switchMap(s -> threadSwitchingFlux())); + } + + @Test + void fluxSwitchMap() { + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .switchMap(s -> threadSwitchingFlux(), 1)); + } + + @Test + void fluxTakeUntilOther() { + // We don't validate direct subscription via CoreSubscriber with Context in + // this case as it can happen that the drain loop is in the main thread + // and won't restore TLs from the Context when contextWrite operator is + // missing along the way in the chain. + assertThreadLocalsPresent( + Flux.concat(Flux.just("Hello"), Flux.never()) + .takeUntilOther(threadSwitchingFlux())); + } + + @Test + void fluxTimeoutFirst() { + assertThreadLocalsPresentInFlux(() -> + Flux.never() + .timeout(threadSwitchingFlux())); + } + + @Test + void fluxTimeoutOther() { + assertThreadLocalsPresentInFlux(() -> + Flux.never() + .timeout(threadSwitchingFlux(), i -> Flux.never(), threadSwitchingFlux())); + } + + @Test + void fluxWindowBoundary() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").delayElements(Duration.ofMillis(20)) + .window(threadSwitchingFlux())); + } + + @Test + void fluxWindowBoundaryFlattened() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").delayElements(Duration.ofMillis(20)) + .window(threadSwitchingFlux()) + .flatMap(Function.identity())); + } + + @Test + @Disabled("Publisher delivering the window has no notion of Context so nothing " + + "can be restored in onNext") + void fluxWindowWhen() { + assertThreadLocalsPresent( + threadSwitchingFlux() + .windowWhen(threadSwitchingFlux(), s -> threadSwitchingFlux())); + } + + @Test + @Disabled("Publisher delivering the window has no notion of Context so nothing " + + "can be restored in onNext") + void fluxDelayedWindowWhen() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").delayElements(Duration.ofMillis(100)) + .windowWhen(threadSwitchingFlux(), s -> threadSwitchingFlux())); + } + + @Test + @Disabled("Publisher completing the window has no notion of Context so nothing " + + "can be restored in onComplete") + void fluxWindowWhenFlatMapped() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("Hello").delayElements(Duration.ofMillis(100)) + .windowWhen(threadSwitchingFlux(), s -> threadSwitchingFlux()) + .flatMap(Function.identity())); + } + + @Test + void fluxWithLatestFrom() { + // We don't validate direct subscription via CoreSubscriber with Context in + // this case as it can happen that the drain loop is in the main thread + // and won't restore TLs from the Context when contextWrite operator is + // missing along the way in the chain. + assertThreadLocalsPresent( + Flux.just("Hello") + .withLatestFrom(threadSwitchingFlux(), (s1, s2) -> s1)); + } + + @Test + void continuationBrokenByThreadSwitch() { + assertThreadLocalsPresentInFlux(() -> + Flux.concat(Mono.empty(), threadSwitchingMono().retry())); + } + // Mono tests @Test @@ -940,6 +1281,140 @@ void monoUsingWhen() { s -> Mono.empty())); } + @Test + void monoFlatMapMany() { + assertThreadLocalsPresentInFlux(() -> + Mono.just("hello") + .hide() + .flatMapMany(item -> threadSwitchingFlux())); + } + + @Test + void monoFlatMapManyFuseable() { + assertThreadLocalsPresentInFlux(() -> + Mono.just("hello") + .flatMapMany(item -> threadSwitchingFlux())); + } + + @Test + void monoDelaySubscription() { + assertThreadLocalsPresentInMono(() -> + Mono.just("Hello").delaySubscription(threadSwitchingMono())); + } + + @Test + void monoFilterWhen() { + assertThreadLocalsPresentInMono(() -> + Mono.just("Hello").hide() + .filterWhen(s -> new ThreadSwitchingMono<>(Boolean.TRUE, executorService))); + } + + @Test + void monoLift() { + assertThreadLocalsPresentInMono(() -> { + Mono mono = Mono.just("Hello").hide(); + + Publisher lifted = + Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } + + @Override + public void onNext(String s) { + executorService.submit(() -> sub.onNext(s)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } + + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + }) + .apply(mono); + + return (Mono) lifted; + }); + } + + @Test + void monoLiftFuseable() { + assertThreadLocalsPresentInMono(() -> { + Mono mono = Mono.just("Hello"); + + Publisher lifted = + Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } + + @Override + public void onNext(String s) { + executorService.submit(() -> sub.onNext(s)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } + + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + }) + .apply(mono); + + return (Mono) lifted; + }); + } + + @Test + void monoOnErrorResume() { + assertThreadLocalsPresentInMono(() -> + Mono.error(new RuntimeException("oops")) + .onErrorResume(e -> threadSwitchingMono())); + } + + @Test + void monoPublishMulticast() { + assertThreadLocalsPresentInMono(() -> + Mono.just("Hello") + .publish(s -> threadSwitchingMono())); + } + + @Test + void monoSwitchIfEmpty() { + assertThreadLocalsPresentInMono(() -> + Mono.empty() + .switchIfEmpty(threadSwitchingMono())); + } + + @Test + void monoTakeUntilOther() { + assertThreadLocalsPresentInMono(() -> + Mono.delay(Duration.ofDays(1)).then(Mono.just("Hello")) + .takeUntilOther(threadSwitchingMono())); + } + + @Test + void monoTimeoutFirst() { + assertThreadLocalsPresentInMono(() -> + Mono.never().timeout(threadSwitchingMono())); + } + + @Test + void monoTimeoutFallback() { + assertThreadLocalsPresentInMono(() -> + Mono.never().timeout(threadSwitchingMono(), threadSwitchingMono())); + } + // ParallelFlux tests @Test @@ -1004,7 +1479,7 @@ void parallelFluxLiftFuseable() { assertThreadLocalsPresentInFlux(() -> { ParallelFlux> parallelFlux = ParallelFlux.from(Flux.just("Hello")) - .collect(ArrayList::new, ArrayList::add); + .collect(ArrayList::new, ArrayList::add); Publisher> lifted = Operators., ArrayList>liftPublisher((pub, sub) -> new CoreSubscriber>() { @@ -1147,7 +1622,7 @@ void sink() throws InterruptedException, TimeoutException { } @Test - void sinkDirect() throws InterruptedException, TimeoutException { + void sinkDirect() throws InterruptedException, TimeoutException, ExecutionException { Sinks.One sink1 = Sinks.one(); assertThatThreadLocalsPresentDirectCoreSubscribe(sink1.asMono(), () -> sink1.tryEmitValue("Hello")); @@ -1402,29 +1877,22 @@ void printInterestingClasses() throws Exception { private class CoreSubscriberWithContext implements CoreSubscriber { - private final AtomicReference valueInOnNext; - private final AtomicReference valueInOnComplete; - private final AtomicReference valueInOnError; - private final AtomicReference error; - private final CountDownLatch latch; - private final AtomicBoolean complete; - private final AtomicBoolean hadNext; - - public CoreSubscriberWithContext( - AtomicReference valueInOnNext, - AtomicReference valueInOnComplete, - AtomicReference valueInOnError, - AtomicReference error, - CountDownLatch latch, - AtomicBoolean hadNext, - AtomicBoolean complete) { - this.valueInOnNext = valueInOnNext; - this.valueInOnComplete = valueInOnComplete; - this.valueInOnError = valueInOnError; - this.error = error; - this.latch = latch; - this.hadNext = hadNext; - this.complete = complete; + final AtomicReference valueInOnNext; + final AtomicReference valueInOnComplete; + final AtomicReference valueInOnError; + final AtomicReference error; + final CountDownLatch latch; + final AtomicBoolean complete; + final AtomicBoolean hadNext; + + public CoreSubscriberWithContext() { + this.valueInOnNext = new AtomicReference<>(); + this.valueInOnComplete = new AtomicReference<>(); + this.valueInOnError = new AtomicReference<>(); + this.error = new AtomicReference<>(); + this.complete = new AtomicBoolean(); + this.hadNext = new AtomicBoolean(); + this.latch = new CountDownLatch(1); } @Override @@ -1530,6 +1998,42 @@ void fluxFlatMapToPublisher() throws InterruptedException, ExecutionException { executorService.shutdownNow(); } + @Test + void monoFlatMapToPublisher() throws InterruptedException, ExecutionException { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + AtomicReference value = new AtomicReference<>(); + + TestPublisher testPublisher = TestPublisher.create(); + Publisher nonReactorPublisher = testPublisher; + + Mono.just("hello") + .hide() + .flatMapMany(s -> nonReactorPublisher) + .doOnNext(s -> value.set(REF.get())) + .contextWrite(Context.of(KEY, "present")) + .subscribe(); + + executorService + .submit(() -> testPublisher.emit("test").complete()) + .get(); + + testPublisher.assertWasSubscribed(); + testPublisher.assertWasNotCancelled(); + testPublisher.assertWasRequested(); + assertThat(value.get()).isEqualTo("present"); + + // validate there are no leftovers for other tasks to be attributed to + // previous values + executorService.submit(() -> value.set(REF.get())).get(); + + assertThat(value.get()).isEqualTo("ref_init"); + + // validate the current Thread does not have the value set either + assertThat(REF.get()).isEqualTo("ref_init"); + + executorService.shutdownNow(); + } + @Test void monoFromPublisher() throws InterruptedException, ExecutionException { ExecutorService executorService = Executors.newSingleThreadExecutor(); diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingFlux.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingFlux.java index 90bb0ad95a..4978bdae48 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingFlux.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingFlux.java @@ -26,11 +26,19 @@ public class ThreadSwitchingFlux extends Flux implements Subscription, Run private final ExecutorService executorService; private final T item; + private final Throwable error; private CoreSubscriber actual; AtomicBoolean done = new AtomicBoolean(); public ThreadSwitchingFlux(T item, ExecutorService executorService) { this.item = item; + this.error = null; + this.executorService = executorService; + } + + public ThreadSwitchingFlux(Throwable error, ExecutorService executorService) { + this.item = null; + this.error = error; this.executorService = executorService; } @@ -47,7 +55,12 @@ public void run() { private void deliver() { if (done.compareAndSet(false, true)) { - this.actual.onNext(this.item); + if (this.item != null) { + this.actual.onNext(this.item); + } + if (this.error != null) { + this.actual.onError(this.error); + } this.executorService.submit(this.actual::onComplete); } } diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingMono.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingMono.java index 9b52dcea52..9dd458c7d5 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingMono.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/ThreadSwitchingMono.java @@ -26,11 +26,19 @@ public class ThreadSwitchingMono extends Mono implements Subscription, Run private final ExecutorService executorService; private final T item; + private final Throwable error; private CoreSubscriber actual; AtomicBoolean done = new AtomicBoolean(); public ThreadSwitchingMono(T item, ExecutorService executorService) { this.item = item; + this.error = null; + this.executorService = executorService; + } + + public ThreadSwitchingMono(ExecutorService executorService, Throwable error) { + this.item = null; + this.error = error; this.executorService = executorService; } @@ -47,7 +55,12 @@ public void run() { private void deliver() { if (done.compareAndSet(false, true)) { - this.actual.onNext(this.item); + if (this.item != null) { + this.actual.onNext(this.item); + } + if (this.error != null) { + this.actual.onError(this.error); + } this.executorService.submit(this.actual::onComplete); } }