diff --git a/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java b/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java index 8ab52d94..6fc75eda 100644 --- a/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java +++ b/src/main/java/com/pivovarit/collectors/CompletionOrderSpliterator.java @@ -1,11 +1,12 @@ package com.pivovarit.collectors; -import java.util.List; import java.util.Spliterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; +import java.util.stream.Stream; /** * @author Grzegorz Piwowarek @@ -16,10 +17,15 @@ final class CompletionOrderSpliterator implements Spliterator { private final BlockingQueue> completed = new LinkedBlockingQueue<>(); private int remaining; - CompletionOrderSpliterator(List> futures) { - this.initialSize = futures.size(); + CompletionOrderSpliterator(Stream> futures) { + AtomicInteger size = new AtomicInteger(); + futures.forEach(f -> { + f.whenComplete((__, ___) -> completed.add(f)); + size.incrementAndGet(); + }); + + this.initialSize = size.get(); this.remaining = initialSize; - futures.forEach(f -> f.whenComplete((__, ___) -> completed.add(f))); } @Override diff --git a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java index fecf0de9..2ce69b1b 100644 --- a/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java +++ b/src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java @@ -29,18 +29,18 @@ /** * @author Grzegorz Piwowarek */ -class ParallelStreamCollector implements Collector>, Stream> { +class ParallelStreamCollector implements Collector>, Stream> { private static final EnumSet UNORDERED = EnumSet.of(Characteristics.UNORDERED); private final Dispatcher dispatcher; private final Function function; - private final Function>, Stream> processor; + private final Function>, Stream> processor; private final Set characteristics; private ParallelStreamCollector( Function function, - Function>, Stream> processor, + Function>, Stream> processor, Set characteristics, Dispatcher dispatcher) { this.processor = processor; @@ -56,12 +56,12 @@ private void startConsuming() { } @Override - public Supplier>> supplier() { - return ArrayList::new; + public Supplier>> supplier() { + return Stream::builder; } @Override - public BiConsumer>, T> accumulator() { + public BiConsumer>, T> accumulator() { return (acc, e) -> { startConsuming(); acc.add(dispatcher.enqueue(() -> function.apply(e))); @@ -69,17 +69,17 @@ public BiConsumer>, T> accumulator() { } @Override - public BinaryOperator>> combiner() { + public BinaryOperator>> combiner() { return (left, right) -> { throw new UnsupportedOperationException(); }; } @Override - public Function>, Stream> finisher() { + public Function>, Stream> finisher() { return acc -> { dispatcher.stop(); - return processor.apply(acc); + return processor.apply(acc.build()); }; } @@ -116,12 +116,12 @@ public Set characteristics() { : new ParallelStreamCollector<>(mapper, streamOrderedStrategy(), emptySet(), limiting(executor, parallelism)); } - private static Function>, Stream> streamInCompletionOrderStrategy() { + private static Function>, Stream> streamInCompletionOrderStrategy() { return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false); } - private static Function>, Stream> streamOrderedStrategy() { - return futures -> futures.stream().map(CompletableFuture::join); + private static Function>, Stream> streamOrderedStrategy() { + return futures -> futures.map(CompletableFuture::join); } static final class BatchingCollectors { diff --git a/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java b/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java index 226b5a0a..a53efc25 100644 --- a/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java +++ b/src/test/java/com/pivovarit/collectors/CompletionOrderSpliteratorTest.java @@ -33,7 +33,7 @@ void shouldTraverseInCompletionOrder() { f2.complete(1); }); List results = StreamSupport.stream( - new CompletionOrderSpliterator<>(futures), false) + new CompletionOrderSpliterator<>(futures.stream()), false) .collect(Collectors.toList()); assertThat(results).containsExactly(3, 2, 1); @@ -54,7 +54,7 @@ void shouldPropagateException() { f2.complete(1); }); assertThatThrownBy(() -> StreamSupport.stream( - new CompletionOrderSpliterator<>(futures), false) + new CompletionOrderSpliterator<>(futures.stream()), false) .collect(Collectors.toList())) .isInstanceOf(CompletionException.class) .hasCauseExactlyInstanceOf(RuntimeException.class); @@ -74,7 +74,7 @@ void shouldStreamInCompletionOrder() { List> futures = asList(new CompletableFuture<>(), CompletableFuture .completedFuture(value)); - Optional result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).findAny(); + Optional result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures.stream()), false).findAny(); assertThat(result).contains(value); } @@ -83,7 +83,7 @@ void shouldStreamInCompletionOrder() { void shouldNotConsumeOnEmpty() { List> futures = Collections.emptyList(); - Spliterator spliterator = new CompletionOrderSpliterator<>(futures); + Spliterator spliterator = new CompletionOrderSpliterator<>(futures.stream()); ResultHolder result = new ResultHolder<>(); boolean consumed = spliterator.tryAdvance(result);