Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conflict resolution when collecting into a Map #880

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -164,6 +165,38 @@ public <K, V> Uni<Map<K, V>> asMap(
return collector(upstream, Collectors.toMap(actualKM, actualVM), false);
}

/**
* Produces an {@link Uni} emitting a {@link Map} of <code>key -&gt; mapped item</code> for each item emitted by
* this {@link Multi}. The collected map is emitted by the produced {@link Uni} when the {@link Multi} fires the
* completion event.
* <p>
* The key is extracted from each item by applying the {@code keyMapper} function.
* In case of conflict {@code mergeFunction} is used to choose which item should be emitted.
* The value is computed by applying the {@code valueMapper} function.
*
jponge marked this conversation as resolved.
Show resolved Hide resolved
* @param keyMapper a {@link Function} to map item to a key for the {@link Map}. Must not be {@code null},
* must not produce {@code null}
* @param valueMapper a {@link Function} to map item to a value for the {@link Map}. Must not be {@code null},
* must not produce {@code null}
* @param mergeFunction a {@link BinaryOperator} used to resolve collisions between values associated
* with the same key. Must not be {@code null}.
* In case it returns null the owner key will be removed from the {@link Map}
* @param <K> the type of the key extracted from each item emitted by this {@link Multi}
* @param <V> the type of the value extracted from each item emitted by this {@link Multi}
* @return a {@link Uni} emitting an item with the collected {@link Map}. The uni emits the item when this
* {@link Multi} completes
*/
@CheckReturnValue
public <K, V> Uni<Map<K, V>> asMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends V> valueMapper,
BinaryOperator<V> mergeFunction) {
Function<? super T, ? extends K> actualKM = Infrastructure.decorate(nonNull(keyMapper, "keyMapper"));
Function<? super T, ? extends V> actualVM = Infrastructure.decorate(nonNull(valueMapper, "valueMapper"));
BinaryOperator<V> actualMF = Infrastructure.decorate(nonNull(mergeFunction, "mergeFunction"));
return collector(upstream, Collectors.toMap(actualKM, actualVM, actualMF), false);
}

/**
* Produces an {@link Uni} emitting a {@link Map} of <code>key -&gt; Collection of mapped values</code> for each
* item emitted by this {@link Multi}. The collected map is emitted by the produced {@link Uni} when the
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.IOException;
import java.time.Duration;
Expand All @@ -13,6 +15,7 @@
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -215,6 +218,67 @@ public void testCollectIntoMapWithKeyAndValueMappers() {
entry("matt", "MATT"));
}

@Test
public void testCollectIntoMapWithNullMergeFunction() {

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> Multi.createFrom().items(
new TestObject("key1", 5),
new TestObject("key2", 3),
new TestObject("key3", 2),
new TestObject("key1", 8))
.collect().asMap(
TestObject::getKey,
Function.identity(),
null));

assertEquals("`mergeFunction` must not be `null`", exception.getMessage());
}

@Test
public void testCollectIntoMapWithConflicts() {
UniAssertSubscriber<Map<String, TestObject>> subscriber = Multi.createFrom().items(
new TestObject("key1", 5),
new TestObject("key2", 3),
new TestObject("key3", 2),
new TestObject("key1", 8))
.collect().asMap(
TestObject::getKey,
Function.identity(),
(test1, test2) -> test1.addValue(test2.value))
.subscribe().withSubscriber(UniAssertSubscriber.create())
.assertCompleted();

assertThat(subscriber.getItem())
.hasSize(3)
.contains(
entry("key1", new TestObject("key1", 13)),
entry("key2", new TestObject("key2", 3)),
entry("key3", new TestObject("key3", 2)));
}

@Test
public void testCollectIntoMapWithMergeFunctionReturningNull() {
UniAssertSubscriber<Map<String, TestObject>> subscriber = Multi.createFrom().items(
new TestObject("key1", 5),
new TestObject("key2", 3),
new TestObject("key3", 2),
new TestObject("key1", 8))
.collect().asMap(
TestObject::getKey,
Function.identity(),
(test1, test2) -> null)
.subscribe().withSubscriber(UniAssertSubscriber.create())
.assertCompleted();

assertThat(subscriber.getItem())
.hasSize(2)
.contains(
entry("key2", new TestObject("key2", 3)),
entry("key3", new TestObject("key3", 2)));
}

@Test
public void testCollectAsMapWithEmpty() {
UniAssertSubscriber<Map<String, Person>> subscriber = Multi.createFrom().<Person> empty()
Expand Down Expand Up @@ -419,4 +483,46 @@ public int hashCode() {

}

static class TestObject {

public String key;
public Integer value;

public TestObject(String key, Integer value) {
this.key = key;
this.value = value;
}

public String getKey() {
return this.key;
}

public Integer getValue() {
return this.value;
}

public TestObject addValue(Integer value) {
this.value += value;
return this;
}

public String toString() {
return String.format("[key = %s, value = %s]", this.key, this.value);
}

public boolean equals(Object object) {
if(object instanceof TestObject) {
TestObject test = (TestObject) object;
return test.key.equals(this.key) &&
test.value.equals(this.value);
}

return false;
}

public int hashCode() {
return Objects.hash(this.key, this.value);
}
}

}