From 848fa884094a488ba304f4c583f94f26be92b2e6 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Sun, 28 Jan 2024 15:28:53 +0100 Subject: [PATCH] Seal CompletionStrategy --- .../collectors/AsyncParallelCollector.java | 1 - .../collectors/CompletionStrategy.java | 23 ++++++++++++++++--- .../com/pivovarit/collectors/Dispatcher.java | 16 +++++-------- .../collectors/FutureCollectors.java | 6 ++--- .../collectors/ArchitectureTest.java | 1 + 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java index bd6a782b..7af367c9 100644 --- a/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java +++ b/src/main/java/com/pivovarit/collectors/AsyncParallelCollector.java @@ -7,7 +7,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; diff --git a/src/main/java/com/pivovarit/collectors/CompletionStrategy.java b/src/main/java/com/pivovarit/collectors/CompletionStrategy.java index bd74edcd..9feb90f1 100644 --- a/src/main/java/com/pivovarit/collectors/CompletionStrategy.java +++ b/src/main/java/com/pivovarit/collectors/CompletionStrategy.java @@ -6,13 +6,30 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -interface CompletionStrategy extends Function>, Stream> { +sealed interface CompletionStrategy extends Function>, Stream> permits CompletionStrategy.Unordered, CompletionStrategy.Ordered { + + Unordered UNORDERED = new Unordered<>(); + Ordered ORDERED = new Ordered<>(); static CompletionStrategy unordered() { - return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false); + return (CompletionStrategy) UNORDERED; } static CompletionStrategy ordered() { - return futures -> futures.stream().map(CompletableFuture::join); + return (CompletionStrategy) ORDERED; + } + + final class Unordered implements CompletionStrategy { + @Override + public Stream apply(List> futures) { + return StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false); + } + } + + final class Ordered implements CompletionStrategy { + @Override + public Stream apply(List> futures) { + return futures.stream().map(CompletableFuture::join); + } } } diff --git a/src/main/java/com/pivovarit/collectors/Dispatcher.java b/src/main/java/com/pivovarit/collectors/Dispatcher.java index f9d1f0d1..dffa9a8d 100644 --- a/src/main/java/com/pivovarit/collectors/Dispatcher.java +++ b/src/main/java/com/pivovarit/collectors/Dispatcher.java @@ -54,7 +54,12 @@ private FutureTask completionTask(Supplier supplier, InterruptibleCompleta if (limiter == null) { future.complete(supplier.get()); } else { - withLimiter(supplier, future); + try { + limiter.acquire(); + future.complete(supplier.get()); + } finally { + limiter.release(); + } } } catch (Throwable e) { completionSignaller.completeExceptionally(e); @@ -65,15 +70,6 @@ private FutureTask completionTask(Supplier supplier, InterruptibleCompleta return task; } - private void withLimiter(Supplier supplier, InterruptibleCompletableFuture future) throws InterruptedException { - try { - limiter.acquire(); - future.complete(supplier.get()); - } finally { - limiter.release(); - } - } - private static BiConsumer shortcircuit(InterruptibleCompletableFuture future) { return (__, throwable) -> { if (throwable != null) { diff --git a/src/main/java/com/pivovarit/collectors/FutureCollectors.java b/src/main/java/com/pivovarit/collectors/FutureCollectors.java index 16156ac9..afab87b4 100644 --- a/src/main/java/com/pivovarit/collectors/FutureCollectors.java +++ b/src/main/java/com/pivovarit/collectors/FutureCollectors.java @@ -13,14 +13,12 @@ final class FutureCollectors { static Collector, ?, CompletableFuture> toFuture(Collector collector) { return Collectors.collectingAndThen(toList(), list -> { - CompletableFuture future = CompletableFuture - .allOf(list.toArray(new CompletableFuture[0])) + var future = CompletableFuture.allOf(list.toArray(CompletableFuture[]::new)) .thenApply(__ -> list.stream() .map(CompletableFuture::join) .collect(collector)); - // CompletableFuture#allOf doesn't shortcircuit on exception so that requires manual handling - for (CompletableFuture f : list) { + for (var f : list) { f.whenComplete((t, throwable) -> { if (throwable != null) { future.completeExceptionally(throwable); diff --git a/src/test/java/com/pivovarit/collectors/ArchitectureTest.java b/src/test/java/com/pivovarit/collectors/ArchitectureTest.java index 75e06af3..29dc68db 100644 --- a/src/test/java/com/pivovarit/collectors/ArchitectureTest.java +++ b/src/test/java/com/pivovarit/collectors/ArchitectureTest.java @@ -16,6 +16,7 @@ class ArchitectureTest { @ArchTest static final ArchRule shouldHaveSingleFacade = classes() .that().arePublic() + .and().areNotNestedClasses() .should().haveSimpleName("ParallelCollectors").orShould().haveSimpleName("Batching") .andShould().haveOnlyFinalFields() .andShould().haveOnlyPrivateConstructors()