diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoDelayUntil.java b/reactor-core/src/main/java/reactor/core/publisher/MonoDelayUntil.java index 26ca6c1aad..51eb2610e6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoDelayUntil.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoDelayUntil.java @@ -94,7 +94,7 @@ MonoDelayUntil copyWithNewTriggerGenerator(boolean delayError, @Override public void subscribe(CoreSubscriber actual) { try { - source.subscribe(subscribeOrReturn(actual)); + Operators.toFluxOrMono(source).subscribe(subscribeOrReturn(actual)); } catch (Throwable e) { Operators.error(actual, Operators.onOperatorError(e, actual.currentContext())); diff --git a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java index 04412a659a..d31aa38756 100644 --- a/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java +++ b/reactor-core/src/withMicrometerTest/java/reactor/core/publisher/AutomaticContextPropagationTest.java @@ -18,7 +18,6 @@ import java.io.File; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -43,6 +43,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; @@ -61,6 +62,7 @@ import reactor.util.retry.Retry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; public class AutomaticContextPropagationTest { @@ -234,7 +236,6 @@ void contextCapturePropagatedAutomaticallyToAllSignals() throws InterruptedExcep // disabling prefetching to observe cancellation .publishOn(Schedulers.parallel(), 1) .doOnNext(i -> { - System.out.println(REF.get()); secondNextTlValue.set(REF.get()); itemDelivered.countDown(); }) @@ -316,7 +317,7 @@ class NonReactorFluxOrMono { @BeforeEach void enableAutomaticContextPropagation() { - executorService = Executors.newSingleThreadExecutor(); + executorService = Executors.newFixedThreadPool(3); } @AfterEach @@ -326,148 +327,224 @@ void cleanupThreadLocals() { // Scaffold methods - void assertThreadLocalPresentInOnNext(Mono chain) { - AtomicReference value = new AtomicReference<>(); + private ThreadSwitchingFlux threadSwitchingFlux() { + return new ThreadSwitchingFlux<>("Hello", executorService); + } - chain.doOnNext(item -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .block(); + private ThreadSwitchingMono threadSwitchingMono() { + return new ThreadSwitchingMono<>("Hello", executorService); + } - assertThat(value.get()).isEqualTo("present"); + void assertThreadLocalsPresentInFlux(Supplier> chainSupplier) { + assertThreadLocalsPresentInFlux(chainSupplier, false); } - void assertThreadLocalPresentInOnSuccess(Mono chain) { - AtomicReference value = new AtomicReference<>(); + void assertThreadLocalsPresentInFlux(Supplier> chainSupplier, + boolean skipCoreSubscriber) { + assertThreadLocalsPresent(chainSupplier.get()); + assertThatNoException().isThrownBy(() -> + assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get())); + if (!skipCoreSubscriber) { + assertThatNoException().isThrownBy(() -> + assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get())); + } + } + + void assertThreadLocalsPresentInMono(Supplier> chainSupplier) { + assertThreadLocalsPresentInMono(chainSupplier, false); + } - chain.doOnSuccess(item -> value.set(REF.get())) + void assertThreadLocalsPresentInMono(Supplier> chainSupplier, + boolean skipCoreSubscriber) { + assertThreadLocalsPresent(chainSupplier.get()); + assertThatNoException().isThrownBy(() -> + assertThatThreadLocalsPresentDirectRawSubscribe(chainSupplier.get())); + if (!skipCoreSubscriber) { + assertThatNoException().isThrownBy(() -> + assertThatThreadLocalsPresentDirectCoreSubscribe(chainSupplier.get())); + } + } + + void assertThreadLocalsPresent(Flux chain) { + AtomicReference tlInOnNext = new AtomicReference<>(); + AtomicReference tlInOnComplete = new AtomicReference<>(); + AtomicReference tlInOnError = new AtomicReference<>(); + + AtomicBoolean hadNext = new AtomicBoolean(false); + AtomicBoolean hadError = new AtomicBoolean(false); + + chain.doOnEach(signal -> { + if (signal.isOnNext()) { + tlInOnNext.set(REF.get()); + hadNext.set(true); + } else if (signal.isOnError()) { + tlInOnError.set(REF.get()); + hadError.set(true); + } else if (signal.isOnComplete()) { + tlInOnComplete.set(REF.get()); + } + }) .contextWrite(Context.of(KEY, "present")) - .block(); + .blockLast(); - assertThat(value.get()).isEqualTo("present"); + if (hadNext.get()) { + assertThat(tlInOnNext.get()).isEqualTo("present"); + } + if (hadError.get()) { + assertThat(tlInOnError.get()).isEqualTo("present"); + } else { + assertThat(tlInOnComplete.get()).isEqualTo("present"); + } } - void assertThreadLocalPresentInOnError(Mono chain) { - AtomicReference value = new AtomicReference<>(); + void assertThreadLocalsPresent(Mono chain) { + AtomicReference tlInOnNext = new AtomicReference<>(); + AtomicReference tlInOnComplete = new AtomicReference<>(); + AtomicReference tlInOnError = new AtomicReference<>(); - chain.doOnError(item -> value.set(REF.get())) + AtomicBoolean hadNext = new AtomicBoolean(false); + AtomicBoolean hadError = new AtomicBoolean(false); + + chain.doOnEach(signal -> { + if (signal.isOnNext()) { + tlInOnNext.set(REF.get()); + hadNext.set(true); + } else if (signal.isOnError()) { + tlInOnError.set(REF.get()); + hadError.set(true); + } else if (signal.isOnComplete()) { + tlInOnComplete.set(REF.get()); + } + }) .contextWrite(Context.of(KEY, "present")) - .onErrorComplete() .block(); - assertThat(value.get()).isEqualTo("present"); + if (hadNext.get()) { + assertThat(tlInOnNext.get()).isEqualTo("present"); + } + if (hadError.get()) { + assertThat(tlInOnError.get()).isEqualTo("present"); + } else { + assertThat(tlInOnComplete.get()).isEqualTo("present"); + } } - void assertThatThreadLocalsPresentDirectCoreSubscribe(CorePublisher source) throws InterruptedException { - AtomicReference value = new AtomicReference<>(); + void assertThatThreadLocalsPresentDirectCoreSubscribe( + CorePublisher source) throws InterruptedException { + assertThatThreadLocalsPresentDirectCoreSubscribe(source, () -> {}); + } + + void assertThatThreadLocalsPresentDirectCoreSubscribe( + CorePublisher source, Runnable asyncAction) throws InterruptedException { + AtomicReference valueInOnNext = new AtomicReference<>(); + AtomicReference valueInOnComplete = new AtomicReference<>(); + AtomicReference valueInOnError = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); AtomicBoolean complete = new AtomicBoolean(); + AtomicBoolean hadNext = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); + new CoreSubscriberWithContext<>( + valueInOnNext, valueInOnComplete, valueInOnError, + error, latch, hadNext, complete); source.subscribe(subscriberWithContext); + executorService.submit(asyncAction); + latch.await(10, TimeUnit.MILLISECONDS); - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); - assertThat(value.get()).isEqualTo("present"); + if (hadNext.get()) { + assertThat(valueInOnNext.get()).isEqualTo("present"); + } + if (error.get() == null) { + assertThat(valueInOnComplete.get()).isEqualTo("present"); + assertThat(complete).isTrue(); + } else { + assertThat(valueInOnError.get()).isEqualTo("present"); + } } - void assertThreadLocalPresentInOnNext(Flux chain) { - AtomicReference value = new AtomicReference<>(); + // We force the use of subscribe(Subscriber) override instead of + // subscribe(CoreSubscriber), and we can observe that for such a case we + // are able to wrap the Subscriber and restore ThreadLocal values for the + // signals received downstream. + void assertThatThreadLocalsPresentDirectRawSubscribe( + Publisher source) throws InterruptedException { + assertThatThreadLocalsPresentDirectRawSubscribe(source, () -> {}); + } - chain.doOnNext(item -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); + void assertThatThreadLocalsPresentDirectRawSubscribe( + Publisher source, Runnable asyncAction) throws InterruptedException { + AtomicReference valueInOnNext = new AtomicReference<>(); + AtomicReference valueInOnComplete = new AtomicReference<>(); + AtomicReference valueInOnError = new AtomicReference<>(); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean hadNext = new AtomicBoolean(); + AtomicBoolean complete = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); - assertThat(value.get()).isEqualTo("present"); - } + CoreSubscriberWithContext subscriberWithContext = + new CoreSubscriberWithContext<>( + valueInOnNext, valueInOnComplete, valueInOnError, + error, latch, hadNext, complete); - void assertThreadLocalPresentInOnComplete(Flux chain) { - AtomicReference value = new AtomicReference<>(); + source.subscribe(subscriberWithContext); - chain.doOnComplete(() -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); + executorService.submit(asyncAction); - assertThat(value.get()).isEqualTo("present"); + latch.await(10, TimeUnit.MILLISECONDS); + + if (hadNext.get()) { + assertThat(valueInOnNext.get()).isEqualTo("present"); + } + if (error.get() == null) { + assertThat(valueInOnComplete.get()).isEqualTo("present"); + assertThat(complete).isTrue(); + } else { + assertThat(valueInOnError.get()).isEqualTo("present"); + } } // Fundamental tests for Flux @Test - void chainedFluxSubscribe() { - ThreadSwitchingFlux chain = new ThreadSwitchingFlux<>("Hello", executorService); - assertThreadLocalPresentInOnNext(chain); + void fluxSubscribe() { + assertThreadLocalsPresentInFlux(this::threadSwitchingFlux, true); } @Test - void internalFluxSubscribe() { - ThreadSwitchingFlux inner = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.just("hello").flatMap(item -> inner); - - assertThreadLocalPresentInOnNext(chain); + void internalFluxFlatMapSubscribe() { + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello") + .flatMap(item -> threadSwitchingFlux())); } @Test void internalFluxSubscribeNoFusion() { - ThreadSwitchingFlux inner = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.just("hello").hide().flatMap(item -> inner); - - assertThreadLocalPresentInOnNext(chain); - } - - @Test - void testFluxSubscriberAsRawSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - - TestSubscriber testSubscriber = - TestSubscriber.builder().contextPut(KEY, "present").build(); - - flux - .doOnNext(i -> value.set(REF.get())) - .subscribe((Subscriber) testSubscriber); - - testSubscriber.block(Duration.ofMillis(10)); - assertThat(testSubscriber.expectTerminalSignal().isOnComplete()).isTrue(); - assertThat(value.get()).isEqualTo("present"); - } - - @Test - void testFluxSubscriberAsCoreSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - - TestSubscriber testSubscriber = - TestSubscriber.builder().contextPut(KEY, "present").build(); - - flux - .doOnNext(i -> value.set(REF.get())) - .subscribe(testSubscriber); - - testSubscriber.block(Duration.ofMillis(10)); - assertThat(testSubscriber.expectTerminalSignal().isOnComplete()).isTrue(); - // Because of onNext in the chain, the internal operator implementation is - // able to wrap the subscriber and restore the ThreadLocal values - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(() -> + Flux.just("hello") + .hide() + .flatMap(item -> threadSwitchingFlux())); } @Test void directFluxSubscribeAsCoreSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); + AtomicReference valueInOnNext = new AtomicReference<>(); + AtomicReference valueInOnComplete = new AtomicReference<>(); + AtomicReference valueInOnError = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); + AtomicBoolean hadNext = new AtomicBoolean(); AtomicBoolean complete = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); + Flux flux = threadSwitchingFlux(); CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); + new CoreSubscriberWithContext<>( + valueInOnNext, valueInOnComplete, valueInOnError, + error, latch, hadNext, complete); flux.subscribe(subscriberWithContext); @@ -479,98 +556,40 @@ void directFluxSubscribeAsCoreSubscriber() throws InterruptedException { // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(value.get()).isEqualTo("ref_init"); - } - - @Test - void directFluxSubscribeAsRawSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); - - // We force the use of subscribe(Subscriber) override instead of - // subscribe(CoreSubscriber), and we can observe that for such a case we - // are able to wrap the Subscriber and restore ThreadLocal values for the - // signals received downstream. - flux.subscribe((Subscriber) subscriberWithContext); - - latch.await(10, TimeUnit.MILLISECONDS); - - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); - assertThat(value.get()).isEqualTo("present"); + assertThat(valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); } // Fundamental tests for Mono @Test - void chainedMonoSubscribe() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - assertThreadLocalPresentInOnNext(mono); - } - - @Test - void internalMonoSubscribe() { - Mono inner = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.just("hello").flatMap(item -> inner); - assertThreadLocalPresentInOnNext(chain); + void monoSubscribe() { + assertThreadLocalsPresentInMono(this::threadSwitchingMono, true); } @Test - void testMonoSubscriberAsRawSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - - TestSubscriber testSubscriber = - TestSubscriber.builder().contextPut(KEY, "present").build(); - - mono - .doOnNext(i -> value.set(REF.get())) - .subscribe((Subscriber) testSubscriber); - - testSubscriber.block(Duration.ofMillis(10)); - assertThat(testSubscriber.expectTerminalSignal().isOnComplete()).isTrue(); - assertThat(value.get()).isEqualTo("present"); - } - - @Test - void testMonoSubscriberAsCoreSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - - TestSubscriber testSubscriber = - TestSubscriber.builder().contextPut(KEY, "present").build(); - - mono - .doOnNext(i -> value.set(REF.get())) - .subscribe(testSubscriber); - - testSubscriber.block(Duration.ofMillis(10)); - assertThat(testSubscriber.expectTerminalSignal().isOnComplete()).isTrue(); - // Because of onNext in the chain, the internal operator implementation is - // able to wrap the subscriber and restore the ThreadLocal values - assertThat(value.get()).isEqualTo("present"); + void internalMonoFlatMapSubscribe() { + assertThreadLocalsPresentInMono(() -> + Mono.just("hello") + .flatMap(item -> threadSwitchingMono())); } @Test void directMonoSubscribeAsCoreSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); + AtomicReference valueInOnNext = new AtomicReference<>(); + AtomicReference valueInOnComplete = new AtomicReference<>(); + AtomicReference valueInOnError = new AtomicReference<>(); AtomicReference error = new AtomicReference<>(); AtomicBoolean complete = new AtomicBoolean(); + AtomicBoolean hadNext = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); Mono mono = new ThreadSwitchingMono<>("Hello", executorService); CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); + new CoreSubscriberWithContext<>( + valueInOnNext, valueInOnComplete, valueInOnError, + error, latch, hadNext, complete); mono.subscribe(subscriberWithContext); @@ -582,635 +601,443 @@ void directMonoSubscribeAsCoreSubscriber() throws InterruptedException { // We can't do anything here. subscribe(CoreSubscriber) is abstract in // CoreSubscriber interface and we have no means to intercept the calls to // restore ThreadLocals. - assertThat(value.get()).isEqualTo("ref_init"); - } - - @Test - void directMonoSubscribeAsRawSubscriber() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); - - // We force the use of subscribe(Subscriber) override instead of - // subscribe(CoreSubscriber), and we can observe that for such a case we - // are able to wrap the Subscriber and restore ThreadLocal values for the - // signals received downstream. - mono.subscribe((Subscriber) subscriberWithContext); - - latch.await(10, TimeUnit.MILLISECONDS); - - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); - assertThat(value.get()).isEqualTo("present"); + assertThat(valueInOnNext.get()).isEqualTo("ref_init"); + assertThat(valueInOnComplete.get()).isEqualTo("ref_init"); } // Flux tests @Test void fluxCreate() { - Flux flux = Flux.create(sink -> { - executorService.submit(() -> { - sink.next("Hello"); - sink.complete(); - }); - }); - - assertThreadLocalPresentInOnNext(flux); - } - - @Test - void fluxCreateDirect() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); - - Publisher flux = Flux.create(sink -> { - executorService.submit(() -> { - sink.next("Hello"); - sink.complete(); - }); - }); - - flux.subscribe(subscriberWithContext); - - latch.await(10, TimeUnit.MILLISECONDS); + Supplier> fluxSupplier = + () -> Flux.create(sink -> executorService.submit(() -> { + sink.next("Hello"); + sink.complete(); + })); - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(fluxSupplier); } @Test void fluxMap() { - Flux flux = - new ThreadSwitchingFlux<>("Hello", executorService).map(String::toUpperCase); - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> threadSwitchingFlux().map(String::toUpperCase)); } @Test void fluxIgnoreThenSwitchThread() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Flux.just("Bye").then(mono); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> Flux.just("Bye").then(threadSwitchingMono())); } @Test void fluxSwitchThreadThenIgnore() { - Flux flux = new ThreadSwitchingFlux<>("Ignored", executorService); - Mono chain = flux.then(Mono.just("Hello")); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> threadSwitchingFlux().then(Mono.just("Hi"))); } @Test void fluxDeferContextual() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.deferContextual(ctx -> flux); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInFlux(() -> + Flux.deferContextual(ctx -> threadSwitchingFlux())); } @Test void fluxFirstWithSignalArray() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.firstWithSignal(flux); - assertThreadLocalPresentInOnNext(chain); - - Flux other = new ThreadSwitchingFlux<>("Hello", executorService); - assertThreadLocalPresentInOnNext(chain.or(other)); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithSignal(threadSwitchingFlux())); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithSignal(threadSwitchingFlux()).or(threadSwitchingFlux())); } @Test void fluxFirstWithSignalIterable() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.firstWithSignal(Collections.singletonList(flux)); - assertThreadLocalPresentInOnNext(chain); - - Flux other1 = new ThreadSwitchingFlux<>("Hello", executorService); - Flux other2 = new ThreadSwitchingFlux<>("Hello", executorService); - List> list = Stream.of(other1, other2).collect(Collectors.toList()); - assertThreadLocalPresentInOnNext(Flux.firstWithSignal(list)); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithSignal(Collections.singletonList(threadSwitchingFlux()))); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithSignal(Stream.of(threadSwitchingFlux(), threadSwitchingFlux()).collect(Collectors.toList()))); } @Test void fluxRetryWhen() { - Flux flux = - new ThreadSwitchingFlux<>("Hello", executorService).retryWhen(Retry.max(1)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux().retryWhen(Retry.max(1))); } @Test void fluxRetryWhenSwitchingThread() { - Flux flux = + assertThreadLocalsPresentInFlux(() -> Flux.error(new RuntimeException("Oops")) - .retryWhen(Retry.from(f -> new ThreadSwitchingFlux<>( - "Hello", executorService))); - - assertThreadLocalPresentInOnComplete(flux); + .retryWhen(Retry.from(f -> threadSwitchingFlux()))); } @Test void fluxWindowUntil() { - Flux flux = - new ThreadSwitchingFlux<>("Hello", executorService) - .windowUntil(s -> true) - .flatMap(Function.identity()); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux().windowUntil(s -> true) + .flatMap(Function.identity())); } @Test void switchOnFirst() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService) - .switchOnFirst((s, f) -> f.map(String::toUpperCase)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .switchOnFirst((s, f) -> f.map(String::toUpperCase))); } @Test void switchOnFirstFuseable() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService) - .filter("Hello"::equals) - .switchOnFirst((s, f) -> f.map(String::toUpperCase)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .filter("Hello"::equals) + .switchOnFirst((s, f) -> f.map(String::toUpperCase))); } @Test void switchOnFirstSwitchThread() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService) - .switchOnFirst((s, f) -> new ThreadSwitchingFlux<>("Goodbye", executorService)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .switchOnFirst((s, f) -> threadSwitchingFlux())); } @Test void switchOnFirstFuseableSwitchThread() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService) - .filter("Hello"::equals) - .switchOnFirst((s, f) -> new ThreadSwitchingFlux<>("Goodbye", executorService)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .filter("Hello"::equals) + .switchOnFirst((s, f) -> threadSwitchingFlux())); } @Test void fluxWindowTimeout() { - Flux> flux = new ThreadSwitchingFlux<>("Hello", executorService) - .windowTimeout(1, Duration.ofDays(1), true); - - assertThreadLocalPresentInOnNext(flux); - } - - @Test - void fluxWindowTimeoutDirect() throws InterruptedException { - Flux> flux = new ThreadSwitchingFlux<>("Hello", executorService) - .windowTimeout(1, Duration.ofDays(1), true); - - assertThatThreadLocalsPresentDirectCoreSubscribe(flux); + assertThreadLocalsPresentInFlux(() -> + threadSwitchingFlux() + .windowTimeout(1, Duration.ofDays(1), true)); } @Test void fluxMergeComparing() { - Flux flux = Flux.mergeComparing(Flux.empty(), - new ThreadSwitchingFlux<>("Hello", executorService)); - - assertThreadLocalPresentInOnNext(flux); - } - - @Test - void fluxMergeComparingDirect() throws InterruptedException { - Flux flux = Flux.mergeComparing(Flux.empty(), - new ThreadSwitchingFlux<>("Hello", executorService)); - - assertThatThreadLocalsPresentDirectCoreSubscribe(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.mergeComparing(Flux.empty(), threadSwitchingFlux())); } @Test void fluxFirstWithValueArray() { - Flux flux = Flux.firstWithValue(Flux.empty(), - new ThreadSwitchingFlux<>("Hola", executorService)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithValue(Flux.empty(), threadSwitchingFlux())); } @Test void fluxFirstWithValueIterable() { - List> list = Stream.of(Flux.empty(), - new ThreadSwitchingFlux<>("Hola", executorService)) - .collect(Collectors.toList()); - Flux flux = Flux.firstWithValue(list); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.firstWithValue( + Stream.of(Flux.empty(), threadSwitchingFlux()) + .collect(Collectors.toList()))); } @Test void fluxConcatArray() { - Flux flux = Flux.concat(Mono.empty(), - new ThreadSwitchingFlux<>("Hello", executorService)); - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.concat(Mono.empty(), threadSwitchingFlux())); } @Test void fluxConcatIterable() { - List> list = Stream.of(Flux.empty(), - new ThreadSwitchingFlux<>("Hello", executorService)) - .collect(Collectors.toList()); - - Flux flux = Flux.concat(list); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.concat( + Stream.of(Flux.empty(), threadSwitchingFlux()).collect(Collectors.toList()))); } @Test void fluxGenerate() { - Flux flux = Flux.generate(sink -> { + assertThreadLocalsPresentInFlux(() -> Flux.generate(sink -> { sink.next("Hello"); // the generator is checked if any signal was delivered by the consumer - // so we check at completion only + // so we perform asynchronous completion only executorService.submit(() -> { sink.complete(); }); - }); - - assertThreadLocalPresentInOnComplete(flux); + })); } @Test void fluxCombineLatest() { - Flux flux = Flux.combineLatest(Flux.just(""), - new ThreadSwitchingFlux<>("Hello", executorService), (s1, s2) -> s2); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.combineLatest( + Flux.just(""), threadSwitchingFlux(), (s1, s2) -> s2)); } @Test void fluxUsing() { - Flux flux = Flux.using(() -> 0, i -> new ThreadSwitchingFlux<>( - "Hello", executorService), i -> {}); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.using(() -> 0, i -> threadSwitchingFlux(), i -> {})); } @Test void fluxZip() { - Flux> flux = Flux.zip(Flux.just(""), - new ThreadSwitchingFlux<>("Hello", - executorService)); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.zip(Flux.just(""), threadSwitchingFlux())); } @Test void fluxZipIterable() { - List> list = Stream.of(Flux.just(""), - new ThreadSwitchingFlux<>("Hello", - executorService)).collect(Collectors.toList()); - - Flux> flux = Flux.zip(list, - obj -> Tuples.of((String) obj[0], (String) obj[1])); - - assertThreadLocalPresentInOnNext(flux); + assertThreadLocalsPresentInFlux(() -> + Flux.zip(Stream.of(Flux.just(""), threadSwitchingFlux()).collect(Collectors.toList()), + obj -> Tuples.of((String) obj[0], (String) obj[1]))); } // Mono tests @Test void monoCreate() { - Mono mono = Mono.create(sink -> { - executorService.submit(() -> { - sink.success("Hello"); - }); - }); - - assertThreadLocalPresentInOnNext(mono); - } - - @Test - void monoCreateDirect() throws InterruptedException { - AtomicReference value = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); - - Publisher mono = Mono.create(sink -> { - executorService.submit(() -> { - sink.success("Hello"); - }); - }); - - mono.subscribe(subscriberWithContext); - - latch.await(10, TimeUnit.MILLISECONDS); - - assertThat(error.get()).isNull(); - assertThat(complete.get()).isTrue(); - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInMono(() -> + Mono.create(sink -> { + executorService.submit(() -> { + sink.success("Hello"); + }); + })); } @Test void monoSwitchThreadIgnoreThen() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = mono.then(Mono.just("Bye")); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + threadSwitchingMono().then(Mono.just("Bye"))); } @Test void monoIgnoreThenSwitchThread() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.just("Bye").then(mono); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.just("Bye").then(threadSwitchingMono())); } @Test void monoSwitchThreadDelayUntil() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = mono.delayUntil(s -> Mono.delay(Duration.ofMillis(1))); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + threadSwitchingMono().delayUntil(s -> Mono.delay(Duration.ofMillis(1)))); } @Test void monoDelayUntilSwitchingThread() { - Mono mono = Mono.just("Hello"); - Mono chain = mono.delayUntil(s -> new ThreadSwitchingMono<>("Done", executorService)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.just("Hello").delayUntil(s -> threadSwitchingMono())); } @Test void monoIgnoreSwitchingThread() { - Mono mono = Mono.ignoreElements(new ThreadSwitchingMono<>("Hello", executorService)); - assertThreadLocalPresentInOnSuccess(mono); + assertThreadLocalsPresentInMono(() -> + Mono.ignoreElements(threadSwitchingMono())); } @Test void monoDeferContextual() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.deferContextual(ctx -> mono); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.deferContextual(ctx -> threadSwitchingMono())); } @Test void monoDefer() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.defer(() -> mono); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.defer(this::threadSwitchingMono)); } @Test void monoFirstWithSignalArray() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.firstWithSignal(mono); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithSignal(threadSwitchingMono())); - Mono other = new ThreadSwitchingMono<>("Hello", executorService); - assertThreadLocalPresentInOnNext(chain.or(other)); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithSignal(threadSwitchingMono()) + .or(threadSwitchingMono())); } @Test void monoFirstWithSignalIterable() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.firstWithSignal(Collections.singletonList(mono)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithSignal(Collections.singletonList(threadSwitchingMono()))); - Mono other1 = new ThreadSwitchingMono<>("Hello", executorService); - Mono other2 = new ThreadSwitchingMono<>("Hello", executorService); - List> list = Stream.of(other1, other2).collect(Collectors.toList()); - assertThreadLocalPresentInOnNext(Mono.firstWithSignal(list)); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithSignal( + Stream.of(threadSwitchingMono(), threadSwitchingMono()) + .collect(Collectors.toList()))); } @Test void monoFromFluxSingle() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Mono chain = flux.single(); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + threadSwitchingFlux().single()); } @Test void monoRetryWhen() { - Mono mono = - new ThreadSwitchingMono<>("Hello", executorService).retryWhen(Retry.max(1)); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + threadSwitchingMono().retryWhen(Retry.max(1))); } @Test void monoRetryWhenSwitchingThread() { - Mono mono = + assertThreadLocalsPresentInMono(() -> Mono.error(new RuntimeException("Oops")) - .retryWhen(Retry.from(f -> new ThreadSwitchingMono<>( - "Hello", executorService))); - - assertThreadLocalPresentInOnSuccess(mono); + .retryWhen(Retry.from(f -> threadSwitchingMono()))); } @Test void monoUsing() { - Mono mono = Mono.using( - () -> "Hello", - seed -> new ThreadSwitchingMono<>("Hola", executorService), - seed -> {}, - false); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.using(() -> "Hello", + seed -> threadSwitchingMono(), + seed -> {}, + false)); } @Test void monoFirstWithValueArray() { - Mono mono = Mono.firstWithValue(Mono.empty(), - new ThreadSwitchingMono<>("Hola", executorService)); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithValue(Mono.empty(), threadSwitchingMono())); } @Test void monoFirstWithValueIterable() { - List> list = Stream.of(Mono.empty(), - new ThreadSwitchingMono<>("Hola", executorService)) - .collect(Collectors.toList()); - Mono mono = Mono.firstWithValue(list); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.firstWithValue( + Stream.of(Mono.empty(), threadSwitchingMono()) + .collect(Collectors.toList()))); } @Test void monoZip() { - Mono> mono = Mono.zip(Mono.just(""), - new ThreadSwitchingMono<>("Hello", - executorService)); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.zip(Mono.just(""), threadSwitchingMono())); } @Test void monoZipIterable() { - List> list = Stream.of(Mono.just(""), - new ThreadSwitchingMono<>("Hello", - executorService)).collect(Collectors.toList()); - - Mono> mono = Mono.zip(list, - obj -> Tuples.of((String) obj[0], (String) obj[1])); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.zip( + Stream.of(Mono.just(""), threadSwitchingMono()) + .collect(Collectors.toList()), + obj -> Tuples.of((String) obj[0], (String) obj[1]))); } @Test void monoSequenceEqual() { - Mono mono = Mono.sequenceEqual(Mono.just("Hello"), - new ThreadSwitchingMono<>("Hello", executorService)); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.sequenceEqual(Mono.just("Hello"), threadSwitchingMono())); } @Test void monoWhen() { - Mono mono = Mono.when(Mono.empty(), new ThreadSwitchingMono<>("Hello" - , executorService)); - - assertThreadLocalPresentInOnSuccess(mono); + assertThreadLocalsPresentInMono(() -> + Mono.when(Mono.empty(), threadSwitchingMono())); } @Test void monoUsingWhen() { - Mono mono = Mono.usingWhen(Mono.just("Hello"), s -> - new ThreadSwitchingMono<>(s, executorService), s -> Mono.empty()); - - assertThreadLocalPresentInOnNext(mono); + assertThreadLocalsPresentInMono(() -> + Mono.usingWhen(Mono.just("Hello"), s -> threadSwitchingMono(), + s -> Mono.empty())); } // ParallelFlux tests @Test void parallelFluxFromMonoToMono() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Mono chain = Mono.from(ParallelFlux.from(mono)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.from(ParallelFlux.from(threadSwitchingMono()))); } @Test void parallelFluxFromMonoToFlux() { - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - Flux chain = Flux.from(ParallelFlux.from(mono)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInFlux(() -> + Flux.from(ParallelFlux.from(threadSwitchingMono()))); } @Test void parallelFluxFromFluxToMono() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Mono chain = Mono.from(ParallelFlux.from(flux)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInMono(() -> + Mono.from(ParallelFlux.from(threadSwitchingFlux()))); } @Test void parallelFluxFromFluxToFlux() { - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - Flux chain = Flux.from(ParallelFlux.from(flux)); - assertThreadLocalPresentInOnNext(chain); + assertThreadLocalsPresentInFlux(() -> + Flux.from(ParallelFlux.from(threadSwitchingFlux()))); } @Test void parallelFluxLift() { - ParallelFlux parallelFlux = - ParallelFlux.from(Flux.just("Hello")); + assertThreadLocalsPresentInFlux(() -> { + ParallelFlux parallelFlux = ParallelFlux.from(Flux.just("Hello")); - Publisher lifted = - Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { - @Override - public void onSubscribe(Subscription s) { - executorService.submit(() -> sub.onSubscribe(s)); - } + Publisher lifted = + Operators.liftPublisher((pub, sub) -> new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } - @Override - public void onNext(String s) { - executorService.submit(() -> sub.onNext(s)); - } + @Override + public void onNext(String s) { + executorService.submit(() -> sub.onNext(s)); + } - @Override - public void onError(Throwable t) { - executorService.submit(() -> sub.onError(t)); - } + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } - @Override - public void onComplete() { - executorService.submit(sub::onComplete); - } - }) - .apply(parallelFlux); + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + }) + .apply(parallelFlux); - assertThreadLocalPresentInOnNext(((ParallelFlux) lifted).sequential()); + return ((ParallelFlux) lifted).sequential(); + }); } @Test void parallelFluxLiftFuseable() { - ParallelFlux> parallelFlux = - ParallelFlux.from(Flux.just("Hello")) - .collect(ArrayList::new, ArrayList::add); - - Publisher> lifted = Operators., ArrayList>liftPublisher( - (pub, sub) -> new CoreSubscriber>() { - @Override - public void onSubscribe(Subscription s) { - executorService.submit(() -> sub.onSubscribe(s)); - } - - @Override - public void onNext(ArrayList s) { - executorService.submit(() -> sub.onNext(s)); - } - - @Override - public void onError(Throwable t) { - executorService.submit(() -> sub.onError(t)); - } - - @Override - public void onComplete() { - executorService.submit(sub::onComplete); - } - }) - .apply(parallelFlux); - - assertThreadLocalPresentInOnNext(((ParallelFlux) lifted).sequential()); + assertThreadLocalsPresentInFlux(() -> { + ParallelFlux> parallelFlux = + ParallelFlux.from(Flux.just("Hello")) + .collect(ArrayList::new, ArrayList::add); + + Publisher> lifted = + Operators., ArrayList>liftPublisher((pub, sub) -> new CoreSubscriber>() { + @Override + public void onSubscribe(Subscription s) { + executorService.submit(() -> sub.onSubscribe(s)); + } + + @Override + public void onNext(ArrayList s) { + executorService.submit(() -> sub.onNext(s)); + } + + @Override + public void onError(Throwable t) { + executorService.submit(() -> sub.onError(t)); + } + + @Override + public void onComplete() { + executorService.submit(sub::onComplete); + } + }) + .apply(parallelFlux); + + return ((ParallelFlux) lifted).sequential(); + }); } @Test void parallelFluxFromThreadSwitchingMono() { - AtomicReference value = new AtomicReference<>(); - - Mono mono = new ThreadSwitchingMono<>("Hello", executorService); - - ParallelFlux.from(mono) - .sequential() - .doOnNext(i -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); - - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(() -> + ParallelFlux.from(threadSwitchingMono()).sequential()); } @Test void parallelFluxFromThreadSwitchingFlux() { - AtomicReference value = new AtomicReference<>(); - - Flux flux = new ThreadSwitchingFlux<>("Hello", executorService); - - ParallelFlux.from(flux) - .sequential() - .doOnNext(i -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); - - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(() -> + ParallelFlux.from(threadSwitchingFlux()).sequential()); } @Test @@ -1227,26 +1054,16 @@ void threadSwitchingParallelFluxSequential() { @Test void threadSwitchingParallelFluxThen() { - AtomicReference value = new AtomicReference<>(); - new ThreadSwitchingParallelFlux("Hello", executorService) - .then() - .doOnSuccess(v -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .block(); - - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInMono(() -> + new ThreadSwitchingParallelFlux("Hello", executorService) + .then()); } @Test void threadSwitchingParallelFluxOrdered() { - AtomicReference value = new AtomicReference<>(); - new ThreadSwitchingParallelFlux("Hello", executorService) - .ordered(Comparator.naturalOrder()) - .doOnNext(i -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); - - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(() -> + new ThreadSwitchingParallelFlux("Hello", executorService) + .ordered(Comparator.naturalOrder())); } @Test @@ -1292,14 +1109,9 @@ void threadSwitchingParallelFluxGroup() { @Test void threadSwitchingParallelFluxSort() { - AtomicReference value = new AtomicReference<>(); - new ThreadSwitchingParallelFlux("Hello", executorService) - .sorted(Comparator.naturalOrder()) - .doOnNext(i -> value.set(REF.get())) - .contextWrite(Context.of(KEY, "present")) - .blockLast(); - - assertThat(value.get()).isEqualTo("present"); + assertThreadLocalsPresentInFlux(() -> + new ThreadSwitchingParallelFlux("Hello", executorService) + .sorted(Comparator.naturalOrder())); } // Sinks tests @@ -1330,44 +1142,31 @@ void sink() throws InterruptedException, TimeoutException { @Test void sinkDirect() throws InterruptedException, TimeoutException { - AtomicReference value = new AtomicReference<>(); - AtomicReference error = new AtomicReference<>(); - AtomicBoolean complete = new AtomicBoolean(); - CountDownLatch latch = new CountDownLatch(1); - - Sinks.One sink = Sinks.one(); - - CoreSubscriberWithContext subscriberWithContext = - new CoreSubscriberWithContext<>(value, error, latch, complete); - - sink.asMono() - .subscribe(subscriberWithContext); - - executorService.submit(() -> sink.tryEmitValue("Hello")); - - if (!latch.await(10, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("timed out"); - } + Sinks.One sink1 = Sinks.one(); + assertThatThreadLocalsPresentDirectCoreSubscribe(sink1.asMono(), + () -> sink1.tryEmitValue("Hello")); - assertThat(value.get()).isEqualTo("present"); + Sinks.One sink2 = Sinks.one(); + assertThatThreadLocalsPresentDirectRawSubscribe(sink2.asMono(), + () -> sink2.tryEmitValue("Hello")); } @Test - void sinkEmpty() throws InterruptedException, TimeoutException { + void sinksEmpty() throws InterruptedException, TimeoutException { AtomicReference value = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - Sinks.Empty empty = Sinks.empty(); + Sinks.Empty spec = Sinks.empty(); - empty.asMono() - .doOnTerminate(() -> { + spec.asMono() + .doOnSuccess(ignored -> { value.set(REF.get()); latch.countDown(); }) .contextWrite(Context.of(KEY, "present")) .subscribe(); - executorService.submit(empty::tryEmitEmpty); + executorService.submit(spec::tryEmitEmpty); if (!latch.await(10, TimeUnit.MILLISECONDS)) { throw new TimeoutException("timed out"); @@ -1376,6 +1175,15 @@ void sinkEmpty() throws InterruptedException, TimeoutException { assertThat(value.get()).isEqualTo("present"); } + @Test + void sinksEmptyDirect() throws InterruptedException { + Sinks.Empty empty1 = Sinks.empty(); + assertThatThreadLocalsPresentDirectCoreSubscribe(empty1.asMono(), empty1::tryEmitEmpty); + + Sinks.Empty empty2 = Sinks.empty(); + assertThatThreadLocalsPresentDirectRawSubscribe(empty2.asMono(), empty2::tryEmitEmpty); + } + @Test void sinkManyUnicast() throws InterruptedException, TimeoutException { AtomicReference value = new AtomicReference<>(); @@ -1402,6 +1210,25 @@ void sinkManyUnicast() throws InterruptedException, TimeoutException { assertThat(value.get()).isEqualTo("present"); } + @Test + void sinkManyUnicastDirect() throws InterruptedException { + Sinks.Many many1 = Sinks.many().unicast() + .onBackpressureBuffer(); + + assertThatThreadLocalsPresentDirectCoreSubscribe(many1.asFlux(), () -> { + many1.tryEmitNext("Hello"); + many1.tryEmitComplete(); + }); + + Sinks.Many many2 = Sinks.many().unicast() + .onBackpressureBuffer(); + + assertThatThreadLocalsPresentDirectRawSubscribe(many2.asFlux(), () -> { + many2.tryEmitNext("Hello"); + many2.tryEmitComplete(); + }); + } + @Test void sinkManyUnicastNoBackpressure() throws InterruptedException, TimeoutException { @@ -1504,30 +1331,6 @@ void sinkManyMulticastBestEffort() throws InterruptedException, TimeoutException assertThat(value.get()).isEqualTo("present"); } - @Test - void sinksEmpty() throws InterruptedException, TimeoutException { - AtomicReference value = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - - Sinks.Empty spec = Sinks.empty(); - - spec.asMono() - .doOnSuccess(ignored -> { - value.set(REF.get()); - latch.countDown(); - }) - .contextWrite(Context.of(KEY, "present")) - .subscribe(); - - executorService.submit(spec::tryEmitEmpty); - - if (!latch.await(10, TimeUnit.MILLISECONDS)) { - throw new TimeoutException("timed out"); - } - - assertThat(value.get()).isEqualTo("present"); - } - // Other List> getAllClassesInClasspathRecursively(File directory) throws Exception { @@ -1557,6 +1360,7 @@ List> getAllClassesInClasspathRecursively(File directory) throws Except } @Test + @Disabled("Used to find Publishers that can switch threads") void printInterestingClasses() throws Exception { List> allClasses = getAllClassesInClasspathRecursively(new File("./build/classes/java/main/reactor/")); @@ -1592,18 +1396,28 @@ void printInterestingClasses() throws Exception { private class CoreSubscriberWithContext implements CoreSubscriber { - private final AtomicReference value; + private final AtomicReference valueInOnNext; + private final AtomicReference valueInOnComplete; + private final AtomicReference valueInOnError; private final AtomicReference error; private final CountDownLatch latch; private final AtomicBoolean complete; + private final AtomicBoolean hadNext; - public CoreSubscriberWithContext(AtomicReference value, + public CoreSubscriberWithContext( + AtomicReference valueInOnNext, + AtomicReference valueInOnComplete, + AtomicReference valueInOnError, AtomicReference error, CountDownLatch latch, + AtomicBoolean hadNext, AtomicBoolean complete) { - this.value = value; + this.valueInOnNext = valueInOnNext; + this.valueInOnComplete = valueInOnComplete; + this.valueInOnError = valueInOnError; this.error = error; this.latch = latch; + this.hadNext = hadNext; this.complete = complete; } @@ -1619,18 +1433,21 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { - value.set(REF.get()); + hadNext.set(true); + valueInOnNext.set(REF.get()); } @Override public void onError(Throwable t) { error.set(t); + valueInOnError.set(REF.get()); latch.countDown(); } @Override public void onComplete() { complete.set(true); + valueInOnComplete.set(REF.get()); latch.countDown(); } }