diff --git a/src/test/java/com/pivovarit/collectors/FunctionalTest.java b/src/test/java/com/pivovarit/collectors/FunctionalTest.java index b2d00f37..14ac0c90 100644 --- a/src/test/java/com/pivovarit/collectors/FunctionalTest.java +++ b/src/test/java/com/pivovarit/collectors/FunctionalTest.java @@ -63,14 +63,14 @@ class FunctionalTest { Stream collectors() { return of( // virtual threads - virtualThreadsTests((m, e, p) -> parallel(m, toList()), "ParallelCollectors.parallel(toList()) [virtual]", true), - virtualThreadsTests((m, e, p) -> parallel(m, toList(), p), "ParallelCollectors.parallel(toList()) [virtual]", true), - virtualThreadsTests((m, e, p) -> parallel(m, toSet()), "ParallelCollectors.parallel(toSet()) [virtual]", false), - virtualThreadsTests((m, e, p) -> parallel(m, toSet(), p), "ParallelCollectors.parallel(toSet()) [virtual]", false), - virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new)), "ParallelCollectors.parallel(toCollection()) [virtual]", true), - virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new), p), "ParallelCollectors.parallel(toCollection()) [virtual]", true), - virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]", true), - virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]", true), + virtualThreadsTests((m, e, p) -> parallel(m, toList()), "ParallelCollectors.parallel(toList()) [virtual]"), + virtualThreadsTests((m, e, p) -> parallel(m, toList(), p), "ParallelCollectors.parallel(toList()) [virtual]"), + virtualThreadsTests((m, e, p) -> parallel(m, toSet()), "ParallelCollectors.parallel(toSet()) [virtual]"), + virtualThreadsTests((m, e, p) -> parallel(m, toSet(), p), "ParallelCollectors.parallel(toSet()) [virtual]"), + virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new)), "ParallelCollectors.parallel(toCollection()) [virtual]"), + virtualThreadsTests((m, e, p) -> parallel(m, toCollection(LinkedList::new), p), "ParallelCollectors.parallel(toCollection()) [virtual]"), + virtualThreadsTests((m, e, p) -> adapt(parallel(m)), "ParallelCollectors.parallel() [virtual]"), + virtualThreadsTests((m, e, p) -> adapt(parallel(m, p)), "ParallelCollectors.parallel() [virtual]"), // platform threads tests((m, e, p) -> parallel(m, toList(), e, p), format("ParallelCollectors.parallel(toList(), p=%d)", PARALLELISM), true, true), tests((m, e, p) -> parallel(m, toSet(), e, p), format("ParallelCollectors.parallel(toSet(), p=%d)", PARALLELISM), false, true), @@ -90,8 +90,8 @@ Stream collectors() { Stream streaming_collectors() { return of( // virtual threads - virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToStream(m)), "ParallelCollectors.parallelToStream() [virtual]", false), - virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m)), "ParallelCollectors.parallelToOrderedStream() [virtual]", true), + virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToStream(m)), "ParallelCollectors.parallelToStream() [virtual]"), + virtualThreadsStreamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m)), "ParallelCollectors.parallelToOrderedStream() [virtual]"), // platform threads streamingTests((m, e, p) -> adaptAsync(parallelToStream(m, e, p)), format("ParallelCollectors.parallelToStream(p=%d)", PARALLELISM), false), streamingTests((m, e, p) -> adaptAsync(parallelToOrderedStream(m, e, p)), format("ParallelCollectors.parallelToOrderedStream(p=%d)", PARALLELISM), true) @@ -163,18 +163,14 @@ void shouldExecuteEagerlyOnProvidedThreadPool() { } } - private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { - var tests = of( + private static > Stream virtualThreadsTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { + return of( shouldStartConsumingImmediately(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldInterruptOnException(collector, name), shouldRemainConsistent(collector, name) ); - - return maintainsOrder - ? Stream.concat(tests, of(shouldMaintainOrder(collector, name))) - : tests; } private static > Stream tests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder, boolean limitedParallelism) { @@ -195,17 +191,13 @@ private static > Stream tests(Collect return tests; } - private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) { - var tests = of( + private static > Stream virtualThreadsStreamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name) { + return of( shouldStartConsumingImmediately(collector, name), shouldHandleThrowable(collector, name), shouldShortCircuitOnException(collector, name), shouldRemainConsistent(collector, name) ); - - return maintainsOrder - ? Stream.concat(tests, of(shouldMaintainOrder(collector, name))) - : tests; } private static > Stream streamingTests(CollectorSupplier, Executor, Integer, Collector>> collector, String name, boolean maintainsOrder) {