From e3bccdf4bb2a7cd3bd8d35ceb80bd979551582f9 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Sun, 15 Sep 2024 16:39:22 +0200 Subject: [PATCH] Rewrite non-blocking tests --- .../pivovarit/collectors/FunctionalTest.java | 15 ----- .../collectors/test/BasicParallelismTest.java | 2 +- .../collectors/test/NonBlockingTest.java | 61 +++++++++++++++++++ 3 files changed, 62 insertions(+), 16 deletions(-) create mode 100644 src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index 9294cefe..b2d00f37 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -166,7 +166,6 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( shouldStartConsumingImmediately(collector, name), - shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name), @@ -181,7 +180,6 @@ private static > Stream virtualThread private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) { var tests = of( shouldStartConsumingImmediately(collector, name), - shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name), @@ -200,7 +198,6 @@ private static > Stream tests(Collect private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( shouldStartConsumingImmediately(collector, name), - shouldNotBlockTheCallingThread(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldRemainConsistent(collector, name) @@ -214,7 +211,6 @@ private static > Stream virtualThread private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { var tests = of( shouldStartConsumingImmediately(collector, name), - shouldNotBlockTheCallingThread(collector, name), shouldRespectParallelism(collector, name), shouldPushElementsToStreamAsSoonAsPossible(collector, name), shouldHandleThrowable(collector, name), @@ -242,16 +238,6 @@ private static > Stream batchStreamin of(shouldProcessOnNThreadsETParallelism(collector, name))); } - private static > DynamicTest shouldNotBlockTheCallingThread(CollectorSupplier, Executor, Integer, Collector>> c, String name) { - return dynamicTest(format("%s: should not block when returning future", name), () -> { - withExecutor(e -> { - assertTimeoutPreemptively(ofMillis(100), () -> - Stream.empty().collect(c - .apply(i -> returnWithDelay(42, ofMillis(Integer.MAX_VALUE)), e, 1)), "returned blocking future"); - }); - }); - } - private static > DynamicTest shouldRespectParallelism(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { return dynamicTest(format("%s: should respect parallelism", name), () -> { int parallelism = 2; @@ -274,7 +260,6 @@ private static > DynamicTest shouldPushElementsToS return dynamicTest(format("%s: should push elements as soon as possible ", name), () -> { int parallelism = 2; int delayMillis = 50; - var counter = new AtomicInteger(); withExecutor(e -> { LocalTime before = LocalTime.now(); Stream.generate(() -> 42) diff --git a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java index c60a7b14..c8ec9dc4 100644 --- a/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java +++ b/src/test/java/com/pivovarit/collectors/test/BasicParallelismTest.java @@ -59,7 +59,7 @@ static CollectorDefinition collector(String name, CollectorFactory< } @FunctionalInterface - interface CollectorFactory { + private interface CollectorFactory { Collector> collector(Function f, Integer p); } diff --git a/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java new file mode 100644 index 00000000..5f1757f5 --- /dev/null +++ b/src/test/java/com/pivovarit/collectors/test/NonBlockingTest.java @@ -0,0 +1,61 @@ +package com.pivovarit.collectors.test; + +import com.pivovarit.collectors.ParallelCollectors; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Stream; + +import static com.pivovarit.collectors.TestUtils.returnWithDelay; +import static com.pivovarit.collectors.test.NonBlockingTest.CollectorDefinition.collector; +import static java.time.Duration.ofDays; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +class NonBlockingTest { + + private static Stream> allAsync() { + return Stream.of( + collector("parallel()", f -> collectingAndThen(ParallelCollectors.parallel(f), c -> c.thenApply(Stream::toList))), + collector("parallel(toList())", f -> ParallelCollectors.parallel(f, toList())), + collector("parallel(toList(), e)", f -> ParallelCollectors.parallel(f, toList(), e())), + collector("parallel(toList(), e, p=1)", f -> ParallelCollectors.parallel(f, toList(), e(), 1)), + collector("parallel(toList(), e, p=2)", f -> ParallelCollectors.parallel(f, toList(), e(), 2)), + collector("parallel(toList(), e, p=1) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 1)), + collector("parallel(toList(), e, p=2) [batching]", f -> ParallelCollectors.Batching.parallel(f, toList(), e(), 2)) + ); + } + + @TestFactory + Stream shouldNotBlockTheCallingThread() { + return allAsync() + .map(c -> DynamicTest.dynamicTest(c.name(), () -> { + assertTimeoutPreemptively(Duration.ofMillis(100), () -> { + var __ = Stream.of(1, 2, 3, 4).collect(c.factory().collector(i -> returnWithDelay(i, ofDays(1)))); + }); + })); + } + + protected record CollectorDefinition(String name, CollectorFactory factory) { + static CollectorDefinition collector(String name, CollectorFactory collector) { + return new CollectorDefinition<>(name, collector); + } + } + + @FunctionalInterface + private interface CollectorFactory { + Collector>> collector(Function f); + } + + private static Executor e() { + return Executors.newCachedThreadPool(); + } +}