diff --git a/documentation/src/main/jekyll/guides/repetitions.adoc b/documentation/src/main/jekyll/guides/repetitions.adoc index 495d6ba52..b9b798eb9 100644 --- a/documentation/src/main/jekyll/guides/repetitions.adoc +++ b/documentation/src/main/jekyll/guides/repetitions.adoc @@ -24,6 +24,9 @@ Applying `.select().distinct()` on such stream produces: IMPORTANT: Do not use `.select().distinct()` on large or infinite streams. The operator keeps a reference on all the emitted items, and so, it could lead to memory issues if the stream contains too many distinct items. +TIP: By default, `select().distinct()` uses the `hashCode` method from the item's class. +You can pass a custom comparator for more advanced checks. + == Skipping repetitions The `.skip().repetitions()` operator removes subsequent repetitions of an item: @@ -37,4 +40,7 @@ If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items. Applying `.skip().repetitions()` on such stream produces: {1, 2, 3, 4, 5, 6, 1, 4}. -Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams. \ No newline at end of file +Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams. + +TIP: By default, `skip().repetitions()` uses the `equals` method from the item's class. +You can pass a custom comparator for more advanced checks. \ No newline at end of file diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java index 4726a2c89..92a14d319 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSelect.java @@ -3,6 +3,7 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; import java.time.Duration; +import java.util.Comparator; import java.util.function.Function; import java.util.function.Predicate; @@ -207,11 +208,32 @@ public Multi when(Function> predicate) { * * @return the resulting {@link Multi}. * @see MultiSkip#repetitions() + * @see #distinct(Comparator) */ public Multi distinct() { return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream)); } - // TODO distinct and distinctUntilChanged with comparator + /** + * Selects all the distinct items from the upstream. + * This methods uses the given comparator to compare the items. + *

+ * Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}. + *

+ * If the comparison throws an exception, the produced {@link Multi} fails. + * The produced {@link Multi} completes when the upstream sends the completion event. + *

+ * Unlike {@link #distinct()} which uses a {@link java.util.HashSet} internally, this variant uses a + * {@link java.util.TreeSet} initialized with the given comparator. If the comparator is {@code null}, it uses a + * {@link java.util.HashSet} as backend. + * + * @param comparator the comparator used to compare items. If {@code null}, it will uses the item's {@code hashCode} + * method. + * @return the resulting {@link Multi}. + * @see MultiSkip#repetitions() + */ + public Multi distinct(Comparator comparator) { + return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream, comparator)); + } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSkip.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSkip.java index f8a8faefd..5c829a8dc 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSkip.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiSkip.java @@ -4,6 +4,7 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.positiveOrZero; import java.time.Duration; +import java.util.Comparator; import java.util.function.Function; import java.util.function.Predicate; @@ -141,9 +142,31 @@ public Multi last() { * * @return the resulting {@link Multi} * @see MultiSelect#distinct() + * @see MultiSkip#repetitions(Comparator) */ public Multi repetitions() { - return Infrastructure.onMultiCreation(new MultiDropRepetitionsOp<>(upstream)); + return Infrastructure.onMultiCreation(new MultiSkipRepetitionsOp<>(upstream)); + } + + /** + * Skips repetitions from the upstream. + * So, if the upstream emits consecutively the same item twice, it drops the second occurrence. + *

+ * The items are compared using the given comparator. + *

+ * If the upstream emits a failure, the produced {@link Multi} emits a failure. + * If the comparison throws an exception, the produced {@link Multi} emits that exception as failure. + * The produces {@link Multi} completes when the upstream completes. + *

+ * Unlike {@link MultiSelect#distinct()}, this method can be called on unbounded upstream, as it only keeps a + * reference on the last item. + * + * @return the resulting {@link Multi} + * @see MultiSelect#distinct() + * @see MultiSkip#repetitions() + */ + public Multi repetitions(Comparator comparator) { + return Infrastructure.onMultiCreation(new MultiSkipRepetitionsOp<>(upstream, comparator)); } /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java index 014d2bdc2..9cb6a6507 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/helpers/test/AssertSubscriber.java @@ -19,7 +19,7 @@ * * @param the type of the items */ -@SuppressWarnings({ "ReactiveStreamsSubscriberImplementation" }) +@SuppressWarnings({ "ReactiveStreamsSubscriberImplementation"}) public class AssertSubscriber implements Subscriber { /** diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctOp.java index 76139aece..a5019c845 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDistinctOp.java @@ -1,10 +1,9 @@ package io.smallrye.mutiny.operators.multi; -import java.util.Collection; -import java.util.HashSet; -import java.util.Objects; +import java.util.*; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.subscription.MultiSubscriber; /** @@ -14,22 +13,33 @@ */ public final class MultiDistinctOp extends AbstractMultiOperator { + private final Comparator comparator; + public MultiDistinctOp(Multi upstream) { + this(upstream, null); + } + + public MultiDistinctOp(Multi upstream, Comparator comparator) { super(upstream); + this.comparator = comparator; } @Override - public void subscribe(MultiSubscriber actual) { - upstream.subscribe(new DistinctProcessor<>(Objects.requireNonNull(actual, "Subscriber must not be `null`"))); + public void subscribe(MultiSubscriber subscriber) { + upstream.subscribe(new DistinctProcessor<>(ParameterValidation.nonNullNpe(subscriber, "subscriber"), comparator)); } static final class DistinctProcessor extends MultiOperatorProcessor { final Collection collection; - DistinctProcessor(MultiSubscriber downstream) { + DistinctProcessor(MultiSubscriber downstream, Comparator comparator) { super(downstream); - this.collection = new HashSet<>(); + if (comparator == null) { + this.collection = new HashSet<>(); + } else { + this.collection = new TreeSet<>(comparator); + } } @Override diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDropRepetitionsOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDropRepetitionsOp.java deleted file mode 100644 index 096e1edad..000000000 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiDropRepetitionsOp.java +++ /dev/null @@ -1,67 +0,0 @@ -package io.smallrye.mutiny.operators.multi; - -import io.smallrye.mutiny.Multi; -import io.smallrye.mutiny.subscription.MultiSubscriber; - -/** - * Eliminates the duplicated items from the upstream. - * - * @param the type of items - */ -public final class MultiDropRepetitionsOp extends AbstractMultiOperator { - - public MultiDropRepetitionsOp(Multi upstream) { - super(upstream); - } - - @Override - public void subscribe(MultiSubscriber actual) { - upstream.subscribe().withSubscriber(new DistinctProcessor<>(actual)); - } - - static final class DistinctProcessor extends MultiOperatorProcessor { - - private T last; - - DistinctProcessor(MultiSubscriber downstream) { - super(downstream); - } - - @Override - public void onItem(T t) { - if (isDone()) { - return; - } - try { - if (last == null || !last.equals(t)) { - last = t; - downstream.onItem(t); - } else { - // Request the next one, as that item is dropped. - request(1); - } - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable t) { - super.onFailure(t); - last = null; - } - - @Override - public void onCompletion() { - super.onCompletion(); - last = null; - } - - @Override - public void cancel() { - super.cancel(); - last = null; - } - } - -} diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipRepetitionsOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipRepetitionsOp.java new file mode 100644 index 000000000..b75c34604 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipRepetitionsOp.java @@ -0,0 +1,86 @@ +package io.smallrye.mutiny.operators.multi; + +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNullNpe; + +import java.util.Comparator; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +/** + * Eliminates the duplicated items from the upstream. + * + * @param the type of items + */ +public final class MultiSkipRepetitionsOp extends AbstractMultiOperator { + + private final Comparator comparator; + + public MultiSkipRepetitionsOp(Multi upstream) { + this(upstream, null); + } + + public MultiSkipRepetitionsOp(Multi upstream, Comparator comparator) { + super(upstream); + this.comparator = comparator; + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + nonNullNpe(subscriber, "subscriber"); + upstream.subscribe().withSubscriber(new MultiSkipRepetitionsProcessor<>(subscriber, comparator)); + } + + static final class MultiSkipRepetitionsProcessor extends MultiOperatorProcessor { + + private final Comparator comparator; + private T last; + + public MultiSkipRepetitionsProcessor(MultiSubscriber subscriber, Comparator comparator) { + super(subscriber); + if (comparator == null) { + this.comparator = (a, b) -> a.equals(b) ? 0 : 1; + } else { + this.comparator = comparator; + } + + } + + @Override + public void onItem(T t) { + if (isDone()) { + return; + } + try { + if (last == null || comparator.compare(last, t) != 0) { + last = t; + downstream.onItem(t); + } else { + // Request the next one, as that item is dropped. + request(1); + } + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable t) { + super.onFailure(t); + last = null; + } + + @Override + public void onCompletion() { + super.onCompletion(); + last = null; + } + + @Override + public void cancel() { + super.cancel(); + last = null; + } + } + +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java index c2e2f918f..ff8746154 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiDistinctTest.java @@ -3,6 +3,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; import java.io.IOException; import java.time.Duration; @@ -13,6 +14,8 @@ import java.util.function.Consumer; import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TestException; @@ -31,6 +34,43 @@ public void testDistinct() { .assertItems(1, 2, 3, 4); } + @Test + public void testDistinctWithComparator() { + Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct(Integer::compareTo) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + } + + @Test + public void testDistinctWithNullComparator() { + Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct(null) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4); + } + + @Test + public void testDistinctWithComparatorReturningAlways0() { + Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct((a, b) -> 0) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1); + } + + @Test + public void testDistinctWithComparatorReturningAlways1() { + //noinspection ComparatorMethodParameterNotUsed + Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) + .select().distinct((a, b) -> 1) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4, 2, 4, 2, 4); + } + @SuppressWarnings("deprecation") @Test public void testDistinctDeprecated() { @@ -49,6 +89,14 @@ public void testDistinctWithUpstreamFailure() { .assertFailedWith(IOException.class, "boom"); } + @Test + public void testDistinctWithComparatorWithUpstreamFailure() { + Multi.createFrom(). failure(new IOException("boom")) + .select().distinct(Integer::compareTo) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertFailedWith(IOException.class, "boom"); + } + @SuppressWarnings("deprecation") @Test public void testDistinctWithUpstreamFailureDeprecated() { @@ -68,7 +116,7 @@ public void testThatNullSubscriberAreRejectedDistinct() { @SuppressWarnings("ConstantConditions") @Test - public void testThatNullSubscriberAreRejectedWithoutRepetitions() { + public void testThatNullSubscriberAreRejectedSkipRepetitions() { assertThrows(NullPointerException.class, () -> Multi.createFrom().items(1, 2, 3, 4, 2, 4, 2, 4) .skip().repetitions() .subscribe(null)); @@ -84,7 +132,7 @@ public void testDistinctOnAStreamWithoutDuplicates() { } @Test - public void testWithoutRepetitionsWithUpstreamFailure() { + public void testSkipRepetitionsWithUpstreamFailure() { Multi.createFrom(). failure(new IOException("boom")) .skip().repetitions() .subscribe().withSubscriber(AssertSubscriber.create(10)) @@ -92,7 +140,7 @@ public void testWithoutRepetitionsWithUpstreamFailure() { } @Test - public void testWithoutRepetitions() { + public void testSkipRepetitions() { Multi.createFrom().items(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4) .skip().repetitions() .subscribe().withSubscriber(AssertSubscriber.create(10)) @@ -100,6 +148,34 @@ public void testWithoutRepetitions() { .assertItems(1, 2, 3, 4, 2, 4, 1, 2, 4); } + @Test + public void testSkipRepetitionsWithComparator() { + Multi.createFrom().items(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4) + .skip().repetitions(Integer::compareTo) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1, 2, 3, 4, 2, 4, 1, 2, 4); + } + + @Test + public void testSkipRepetitionsWithComparatorAlwaysReturning0() { + Multi.createFrom().items(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4) + .skip().repetitions((a, b) -> 0) + .subscribe().withSubscriber(AssertSubscriber.create(10)) + .assertCompleted() + .assertItems(1); + } + + @Test + public void testSkipRepetitionsWithComparatorAlwaysReturning1() { + //noinspection ComparatorMethodParameterNotUsed + Multi.createFrom().items(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4) + .skip().repetitions((a, b) -> 1) + .subscribe().withSubscriber(AssertSubscriber.create(20)) + .assertCompleted() + .assertItems(1, 2, 3, 4, 4, 2, 2, 4, 1, 1, 2, 4); + } + @SuppressWarnings("deprecation") @Test public void testDroppedRepetitionsDeprecated() { @@ -111,7 +187,7 @@ public void testDroppedRepetitionsDeprecated() { } @Test - public void testWithoutRepetitionsWithCancellation() { + public void testSkipRepetitionsWithCancellation() { AtomicLong count = new AtomicLong(); AtomicBoolean cancelled = new AtomicBoolean(); AssertSubscriber subscriber = Multi.createFrom().ticks().every(Duration.ofMillis(1)) @@ -132,7 +208,7 @@ public void testWithoutRepetitionsWithCancellation() { } @Test - public void testWithoutRepetitionsWithImmediateCancellation() { + public void testSkipRepetitionsWithImmediateCancellation() { AtomicLong count = new AtomicLong(); AtomicBoolean cancelled = new AtomicBoolean(); Multi.createFrom().ticks().every(Duration.ofMillis(1)) @@ -152,7 +228,7 @@ public void testWithoutRepetitionsWithImmediateCancellation() { } @Test - public void testWithoutRepetitionsOnAStreamWithoutDuplicates() { + public void testSkipRepetitionsOnAStreamWithoutDuplicates() { Multi.createFrom().range(1, 5) .skip().repetitions() .subscribe().withSubscriber(AssertSubscriber.create(10)) @@ -180,7 +256,7 @@ public void testNoEmissionAfterCancellation() { } @Test - public void testDistinctExceptionInComparator() { + public void testDistinctExceptionInHashCode() { AtomicReference> emitter = new AtomicReference<>(); AssertSubscriber subscriber = Multi.createFrom().emitter( (Consumer>) emitter::set) @@ -197,7 +273,24 @@ public void testDistinctExceptionInComparator() { } @Test - public void testWithoutRepetitionsExceptionInComparator() { + public void testDistinctExceptionInComparator() { + AtomicReference> emitter = new AtomicReference<>(); + AssertSubscriber subscriber = Multi.createFrom().emitter( + (Consumer>) emitter::set) + .select().distinct((a, b) -> { + throw new TestException("boom"); + }) + .subscribe().withSubscriber(AssertSubscriber.create(10)); + + subscriber.assertSubscribed() + .assertNotTerminated(); + + emitter.get().emit(1).emit(2).complete(); + subscriber.assertFailedWith(TestException.class, "boom"); + } + + @Test + public void testSkipRepetitionsExceptionInEquals() { AtomicReference> emitter = new AtomicReference<>(); AssertSubscriber subscriber = Multi.createFrom().emitter( (Consumer>) emitter::set) @@ -215,6 +308,63 @@ public void testWithoutRepetitionsExceptionInComparator() { .assertFailedWith(TestException.class, "boom"); } + @Test + public void testSkipRepetitionsExceptionInComparator() { + AtomicReference> emitter = new AtomicReference<>(); + AssertSubscriber subscriber = Multi.createFrom().emitter( + (Consumer>) emitter::set) + .skip().repetitions((a, b) -> { + throw new TestException("boom"); + }) + .subscribe().withSubscriber(AssertSubscriber.create(10)); + + subscriber.assertSubscribed() + .assertNotTerminated(); + + emitter.get().emit(1).emit(2).complete(); + subscriber + .await() + .assertFailedWith(TestException.class, "boom"); + } + + @Test + public void testOnItemAfterCancellation() { + AtomicReference> ref = new AtomicReference<>(); + AbstractMulti upstream = new AbstractMulti() { + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(mock(Subscription.class)); + ref.set(subscriber); + } + }; + + upstream + .select().distinct() + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .run(() -> ref.get().onNext(1)) + .assertItems(1) + .request(1) + .run(() -> ref.get().onNext(1)) + .run(() -> ref.get().onNext(3)) + .assertItems(1, 3) + .cancel() + .run(() -> ref.get().onNext(4)) + .assertItems(1, 3); + + upstream + .skip().repetitions() + .subscribe().withSubscriber(AssertSubscriber.create(1)) + .run(() -> ref.get().onNext(1)) + .assertItems(1) + .request(1) + .run(() -> ref.get().onNext(1)) + .run(() -> ref.get().onNext(3)) + .assertItems(1, 3) + .cancel() + .run(() -> ref.get().onNext(4)) + .assertItems(1, 3); + } + private static class BadlyComparableStuffOnHashCode { @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")