Skip to content

Commit

Permalink
Merge pull request aNNiMON#179 from aNNiMON/teeing-collector
Browse files Browse the repository at this point in the history
Add teeing collector. Close aNNiMON#178
  • Loading branch information
aNNiMON authored Mar 22, 2019
2 parents 5405748 + aa12b5d commit e7e9506
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 40 deletions.
127 changes: 99 additions & 28 deletions stream/src/main/java/com/annimon/stream/Collectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.annimon.stream.function.*;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -874,13 +875,12 @@ public void accept(U u) {
*/
@NotNull
public static <T, A, IR, OR> Collector<T, A, OR> collectingAndThen(
@NotNull Collector<T, A, IR> c, Function<IR, OR> finisher) {
Function<A, IR> downstreamFinisher = c.finisher();
if (downstreamFinisher == null) {
downstreamFinisher = castIdentity();
}
@NotNull Collector<T, A, IR> c,
@NotNull Function<IR, OR> finisher) {
Objects.requireNonNull(c);
Objects.requireNonNull(finisher);
return new CollectorsImpl<T, A, OR>(c.supplier(), c.accumulator(),
Function.Util.andThen(downstreamFinisher, finisher));
Function.Util.andThen(c.finisher(), finisher));
}

/**
Expand Down Expand Up @@ -943,24 +943,21 @@ public static <T, A, IR, OR> Collector<T, A, OR> collectingAndThen(

@SuppressWarnings("unchecked")
final Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = null;
if (downstreamFinisher != null) {
finisher = new Function<Map<K, A>, M>() {
@NotNull
@Override
public M apply(@NotNull Map<K, A> map) {
// Update values of a map by a finisher function
for (Map.Entry<K, A> entry : map.entrySet()) {
A value = entry.getValue();
value = downstreamFinisher.apply(value);
entry.setValue(value);
}
@SuppressWarnings("unchecked")
M castedMap = (M) map;
return castedMap;
Function<Map<K, A>, M> finisher = new Function<Map<K, A>, M>() {
@NotNull
@Override
public M apply(@NotNull Map<K, A> map) {
// Update values of a map by a finisher function
for (Map.Entry<K, A> entry : map.entrySet()) {
A value = entry.getValue();
value = downstreamFinisher.apply(value);
entry.setValue(value);
}
};
}
@SuppressWarnings("unchecked")
M castedMap = (M) map;
return castedMap;
}
};

@SuppressWarnings("unchecked")
Supplier<Map<K, A>> castedMapFactory = (Supplier<Map<K, A>>) mapFactory;
Expand Down Expand Up @@ -1041,10 +1038,7 @@ public void accept(@NotNull Tuple2<A> container, T t) {
@NotNull
@Override
public Map<Boolean, D> apply(@NotNull Tuple2<A> container) {
final Function<A, D> downstreamFinisher = downstream.finisher();
final Function<A, D> finisher = downstreamFinisher == null
? Collectors.<A, D>castIdentity()
: downstreamFinisher;
final Function<A, D> finisher = downstream.finisher();
Map<Boolean, D> result = new HashMap<Boolean, D>(2);
result.put(Boolean.TRUE, finisher.apply(container.a));
result.put(Boolean.FALSE, finisher.apply(container.b));
Expand All @@ -1054,6 +1048,83 @@ public Map<Boolean, D> apply(@NotNull Tuple2<A> container) {
);
}


/**
* Returns a {@code Collector} that composites two collectors.
* Each element is processed by two specified collectors,
* then their results are merged using the merge function into the final result.
*
* @param <T> the type of the input elements
* @param <R1> the result type of the first collector
* @param <R2> the result type of the second collector
* @param <R> the type of the final result
* @param downstream1 the first collector
* @param downstream2 the second collector
* @param merger the function which merges two results into the single one
* @return a {@code Collector}
* @since 1.2.2
*/
@NotNull
public static <T, R1, R2, R> Collector<T, ?, R> teeing(
@NotNull final Collector<? super T, ?, R1> downstream1,
@NotNull final Collector<? super T, ?, R2> downstream2,
@NotNull final BiFunction<? super R1, ? super R2, R> merger) {
return teeingImpl(downstream1, downstream2, merger);
}

private static <T, A1, A2, R1, R2, R> Collector<T, ?, R> teeingImpl(
@NotNull final Collector<? super T, A1, R1> downstream1,
@NotNull final Collector<? super T, A2, R2> downstream2,
@NotNull final BiFunction<? super R1, ? super R2, R> merger) {

Objects.requireNonNull(downstream1, "downstream1");
Objects.requireNonNull(downstream2, "downstream2");
Objects.requireNonNull(merger, "merger");

final Supplier<A1> supplier1 =
Objects.requireNonNull(downstream1.supplier(), "downstream1 supplier");
final Supplier<A2> supplier2 =
Objects.requireNonNull(downstream2.supplier(), "downstream2 supplier");

final BiConsumer<A1, ? super T> acc1 =
Objects.requireNonNull(downstream1.accumulator(), "downstream1 accumulator");
final BiConsumer<A2, ? super T> acc2 =
Objects.requireNonNull(downstream2.accumulator(), "downstream2 accumulator");

final Function<A1, R1> finisher1 =
Objects.requireNonNull(downstream1.finisher(), "downstream1 finisher");
final Function<A2, R2> finisher2 =
Objects.requireNonNull(downstream2.finisher(), "downstream2 finisher");

return new CollectorsImpl<T, Map.Entry<A1, A2>, R>(
new Supplier<Map.Entry<A1, A2>>() {
@NotNull
@Override
public Map.Entry<A1, A2> get() {
return new AbstractMap.SimpleEntry<A1, A2>(
supplier1.get(),
supplier2.get());
}
},
new BiConsumer<Map.Entry<A1, A2>, T>() {
@Override
public void accept(@NotNull Map.Entry<A1, A2> entry, T t) {
acc1.accept(entry.getKey(), t);
acc2.accept(entry.getValue(), t);
}
},
new Function<Map.Entry<A1, A2>, R>() {
@NotNull
@Override
public R apply(@NotNull Map.Entry<A1, A2> entry) {
return merger.apply(
finisher1.apply(entry.getKey()),
finisher2.apply(entry.getValue()));
}
}
);
}

@NotNull
private static <K, V> Supplier<Map<K, V>> hashMapSupplier() {
return new Supplier<Map<K, V>>() {
Expand Down Expand Up @@ -1140,7 +1211,7 @@ private static final class CollectorsImpl<T, A, R> implements Collector<T, A, R>
private final Function<A, R> finisher;

public CollectorsImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator) {
this(supplier, accumulator, null);
this(supplier, accumulator, Collectors.<A, R>castIdentity());
}

public CollectorsImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
Expand Down
4 changes: 1 addition & 3 deletions stream/src/main/java/com/annimon/stream/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1825,9 +1825,7 @@ public <R, A> R collect(@NotNull Collector<? super T, A, R> collector) {
final T value = iterator.next();
collector.accumulator().accept(container, value);
}
if (collector.finisher() != null)
return collector.finisher().apply(container);
return Collectors.<A, R>castIdentity().apply(container);
return collector.finisher().apply(container);
}

/**
Expand Down
52 changes: 43 additions & 9 deletions stream/src/test/java/com/annimon/stream/CollectorsTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package com.annimon.stream;

import com.annimon.stream.function.BinaryOperator;
import com.annimon.stream.function.Function;
import com.annimon.stream.function.IntSupplier;
import com.annimon.stream.function.Predicate;
import com.annimon.stream.function.Supplier;
import com.annimon.stream.function.ToDoubleFunction;
import com.annimon.stream.function.ToIntFunction;
import com.annimon.stream.function.ToLongFunction;
import com.annimon.stream.function.UnaryOperator;
import com.annimon.stream.function.*;
import static com.annimon.stream.test.hamcrest.CommonMatcher.hasOnlyPrivateConstructors;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -836,6 +829,47 @@ public List<Integer> apply(List<Integer> list) {
assertThat(result, instanceOf(LinkedList.class));
}

@Test
public void testTeeingAverage() {
final ToIntFunction<Integer> toInt = new ToIntFunction<Integer>() {
@Override
public int applyAsInt(Integer t) {
return t;
}
};

double result = Stream.of(1, 2, 3, 4, 5)
.collect(Collectors.teeing(
Collectors.summingInt(toInt),
Collectors.<Integer>counting(),
new BiFunction<Integer, Long, Double>() {
@Override
public Double apply(Integer sum, Long count) {
return sum / count.doubleValue();
}
}
));
assertThat(result, closeTo(3, 0.01));
}

@Test
public void testTeeingMultipleContainers() {
Map.Entry<List<Integer>, Set<Integer>> result = Stream.of(1, 2, 2, 3, 4, 1, 3, 5)
.collect(Collectors.teeing(
Collectors.<Integer>toList(),
Collectors.<Integer>toSet(),
new BiFunction<List<Integer>, Set<Integer>, Map.Entry<List<Integer>, Set<Integer>>>() {
@Override
public Map.Entry<List<Integer>, Set<Integer>> apply(
List<Integer> value1, Set<Integer> value2) {
return new AbstractMap.SimpleEntry<List<Integer>, Set<Integer>>(value1, value2);
}
}
));
assertThat(result.getKey(), contains(1, 2, 2, 3, 4, 1, 3, 5));
assertThat(result.getValue(), containsInAnyOrder(1, 2, 3, 4, 5));
}

@Test
public void testPrivateConstructor() throws Exception {
assertThat(Collectors.class, hasOnlyPrivateConstructors());
Expand Down

0 comments on commit e7e9506

Please sign in to comment.