Skip to content

Commit

Permalink
Use Stream.builder instead of ArrayList (#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit authored Mar 21, 2020
1 parent f776dd2 commit 634317a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.pivovarit.collectors;

import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;

/**
* @author Grzegorz Piwowarek
Expand All @@ -16,10 +17,15 @@ final class CompletionOrderSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<CompletableFuture<T>> completed = new LinkedBlockingQueue<>();
private int remaining;

CompletionOrderSpliterator(List<CompletableFuture<T>> futures) {
this.initialSize = futures.size();
CompletionOrderSpliterator(Stream<CompletableFuture<T>> futures) {
AtomicInteger size = new AtomicInteger();
futures.forEach(f -> {
f.whenComplete((__, ___) -> completed.add(f));
size.incrementAndGet();
});

this.initialSize = size.get();
this.remaining = initialSize;
futures.forEach(f -> f.whenComplete((__, ___) -> completed.add(f)));
}

@Override
Expand Down
24 changes: 12 additions & 12 deletions src/main/java/com/pivovarit/collectors/ParallelStreamCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@
/**
* @author Grzegorz Piwowarek
*/
class ParallelStreamCollector<T, R> implements Collector<T, List<CompletableFuture<R>>, Stream<R>> {
class ParallelStreamCollector<T, R> implements Collector<T, Stream.Builder<CompletableFuture<R>>, Stream<R>> {

private static final EnumSet<Characteristics> UNORDERED = EnumSet.of(Characteristics.UNORDERED);

private final Dispatcher<R> dispatcher;
private final Function<T, R> function;
private final Function<List<CompletableFuture<R>>, Stream<R>> processor;
private final Function<Stream<CompletableFuture<R>>, Stream<R>> processor;
private final Set<Characteristics> characteristics;

private ParallelStreamCollector(
Function<T, R> function,
Function<List<CompletableFuture<R>>, Stream<R>> processor,
Function<Stream<CompletableFuture<R>>, Stream<R>> processor,
Set<Characteristics> characteristics,
Dispatcher<R> dispatcher) {
this.processor = processor;
Expand All @@ -56,30 +56,30 @@ private void startConsuming() {
}

@Override
public Supplier<List<CompletableFuture<R>>> supplier() {
return ArrayList::new;
public Supplier<Stream.Builder<CompletableFuture<R>>> supplier() {
return Stream::builder;
}

@Override
public BiConsumer<List<CompletableFuture<R>>, T> accumulator() {
public BiConsumer<Stream.Builder<CompletableFuture<R>>, T> accumulator() {
return (acc, e) -> {
startConsuming();
acc.add(dispatcher.enqueue(() -> function.apply(e)));
};
}

@Override
public BinaryOperator<List<CompletableFuture<R>>> combiner() {
public BinaryOperator<Stream.Builder<CompletableFuture<R>>> combiner() {
return (left, right) -> {
throw new UnsupportedOperationException();
};
}

@Override
public Function<List<CompletableFuture<R>>, Stream<R>> finisher() {
public Function<Stream.Builder<CompletableFuture<R>>, Stream<R>> finisher() {
return acc -> {
dispatcher.stop();
return processor.apply(acc);
return processor.apply(acc.build());
};
}

Expand Down Expand Up @@ -116,12 +116,12 @@ public Set<Characteristics> characteristics() {
: new ParallelStreamCollector<>(mapper, streamOrderedStrategy(), emptySet(), limiting(executor, parallelism));
}

private static <R> Function<List<CompletableFuture<R>>, Stream<R>> streamInCompletionOrderStrategy() {
private static <R> Function<Stream<CompletableFuture<R>>, Stream<R>> streamInCompletionOrderStrategy() {
return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
}

private static <R> Function<List<CompletableFuture<R>>, Stream<R>> streamOrderedStrategy() {
return futures -> futures.stream().map(CompletableFuture::join);
private static <R> Function<Stream<CompletableFuture<R>>, Stream<R>> streamOrderedStrategy() {
return futures -> futures.map(CompletableFuture::join);
}

static final class BatchingCollectors {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void shouldTraverseInCompletionOrder() {
f2.complete(1);
});
List<Integer> results = StreamSupport.stream(
new CompletionOrderSpliterator<>(futures), false)
new CompletionOrderSpliterator<>(futures.stream()), false)
.collect(Collectors.toList());

assertThat(results).containsExactly(3, 2, 1);
Expand All @@ -54,7 +54,7 @@ void shouldPropagateException() {
f2.complete(1);
});
assertThatThrownBy(() -> StreamSupport.stream(
new CompletionOrderSpliterator<>(futures), false)
new CompletionOrderSpliterator<>(futures.stream()), false)
.collect(Collectors.toList()))
.isInstanceOf(CompletionException.class)
.hasCauseExactlyInstanceOf(RuntimeException.class);
Expand All @@ -74,7 +74,7 @@ void shouldStreamInCompletionOrder() {
List<CompletableFuture<Integer>> futures = asList(new CompletableFuture<>(), CompletableFuture
.completedFuture(value));

Optional<Integer> result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false).findAny();
Optional<Integer> result = StreamSupport.stream(new CompletionOrderSpliterator<>(futures.stream()), false).findAny();

assertThat(result).contains(value);
}
Expand All @@ -83,7 +83,7 @@ void shouldStreamInCompletionOrder() {
void shouldNotConsumeOnEmpty() {
List<CompletableFuture<Integer>> futures = Collections.emptyList();

Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(futures);
Spliterator<Integer> spliterator = new CompletionOrderSpliterator<>(futures.stream());

ResultHolder<Integer> result = new ResultHolder<>();
boolean consumed = spliterator.tryAdvance(result);
Expand Down

0 comments on commit 634317a

Please sign in to comment.