Skip to content

Commit

Permalink
Add variants of skip().repetitions() and select().distinct() acceptin…
Browse files Browse the repository at this point in the history
…g custom comparators
  • Loading branch information
cescoffier committed Jan 4, 2021
1 parent 9f16bb4 commit c3db8c5
Show file tree
Hide file tree
Showing 8 changed files with 316 additions and 86 deletions.
8 changes: 7 additions & 1 deletion documentation/src/main/jekyll/guides/repetitions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Applying `.select().distinct()` on such stream produces:
IMPORTANT: Do not use `.select().distinct()` on large or infinite streams.
The operator keeps a reference on all the emitted items, and so, it could lead to memory issues if the stream contains too many distinct items.

TIP: By default, `select().distinct()` uses the `hashCode` method from the item's class.
You can pass a custom comparator for more advanced checks.

== Skipping repetitions

The `.skip().repetitions()` operator removes subsequent repetitions of an item:
Expand All @@ -37,4 +40,7 @@ If you have a stream emitting the {1, 1, 2, 3, 4, 5, 5, 6, 1, 4, 4} items.
Applying `.skip().repetitions()` on such stream produces:
{1, 2, 3, 4, 5, 6, 1, 4}.

Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams.
Unlike `.skip().repetitions())`, you can use this operator on large or infinite streams.

TIP: By default, `skip().repetitions()` uses the `equals` method from the item's class.
You can pass a custom comparator for more advanced checks.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;

import java.time.Duration;
import java.util.Comparator;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -207,11 +208,32 @@ public Multi<T> when(Function<? super T, Uni<Boolean>> predicate) {
*
* @return the resulting {@link Multi}.
* @see MultiSkip#repetitions()
* @see #distinct(Comparator)
*/
public Multi<T> distinct() {
return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream));
}

// TODO distinct and distinctUntilChanged with comparator
/**
* Selects all the distinct items from the upstream.
* This methods uses the given comparator to compare the items.
* <p>
* Do NOT call this method on unbounded upstream, as it would lead to an {@link OutOfMemoryError}.
* <p>
* If the comparison throws an exception, the produced {@link Multi} fails.
* The produced {@link Multi} completes when the upstream sends the completion event.
* <p>
* Unlike {@link #distinct()} which uses a {@link java.util.HashSet} internally, this variant uses a
* {@link java.util.TreeSet} initialized with the given comparator. If the comparator is {@code null}, it uses a
* {@link java.util.HashSet} as backend.
*
* @param comparator the comparator used to compare items. If {@code null}, it will uses the item's {@code hashCode}
* method.
* @return the resulting {@link Multi}.
* @see MultiSkip#repetitions()
*/
public Multi<T> distinct(Comparator<? super T> comparator) {
return Infrastructure.onMultiCreation(new MultiDistinctOp<>(upstream, comparator));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.smallrye.mutiny.helpers.ParameterValidation.positiveOrZero;

import java.time.Duration;
import java.util.Comparator;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -141,9 +142,31 @@ public Multi<T> last() {
*
* @return the resulting {@link Multi}
* @see MultiSelect#distinct()
* @see MultiSkip#repetitions(Comparator)
*/
public Multi<T> repetitions() {
return Infrastructure.onMultiCreation(new MultiDropRepetitionsOp<>(upstream));
return Infrastructure.onMultiCreation(new MultiSkipRepetitionsOp<>(upstream));
}

/**
* Skips repetitions from the upstream.
* So, if the upstream emits consecutively the same item twice, it drops the second occurrence.
* <p>
* The items are compared using the given comparator.
* <p>
* If the upstream emits a failure, the produced {@link Multi} emits a failure.
* If the comparison throws an exception, the produced {@link Multi} emits that exception as failure.
* The produces {@link Multi} completes when the upstream completes.
* <p>
* Unlike {@link MultiSelect#distinct()}, this method can be called on unbounded upstream, as it only keeps a
* reference on the last item.
*
* @return the resulting {@link Multi}
* @see MultiSelect#distinct()
* @see MultiSkip#repetitions()
*/
public Multi<T> repetitions(Comparator<? super T> comparator) {
return Infrastructure.onMultiCreation(new MultiSkipRepetitionsOp<>(upstream, comparator));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*
* @param <T> the type of the items
*/
@SuppressWarnings({ "ReactiveStreamsSubscriberImplementation" })
@SuppressWarnings({ "ReactiveStreamsSubscriberImplementation"})
public class AssertSubscriber<T> implements Subscriber<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.smallrye.mutiny.operators.multi;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.*;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
Expand All @@ -14,22 +13,33 @@
*/
public final class MultiDistinctOp<T> extends AbstractMultiOperator<T, T> {

private final Comparator<? super T> comparator;

public MultiDistinctOp(Multi<? extends T> upstream) {
this(upstream, null);
}

public MultiDistinctOp(Multi<? extends T> upstream, Comparator<? super T> comparator) {
super(upstream);
this.comparator = comparator;
}

@Override
public void subscribe(MultiSubscriber<? super T> actual) {
upstream.subscribe(new DistinctProcessor<>(Objects.requireNonNull(actual, "Subscriber must not be `null`")));
public void subscribe(MultiSubscriber<? super T> subscriber) {
upstream.subscribe(new DistinctProcessor<>(ParameterValidation.nonNullNpe(subscriber, "subscriber"), comparator));
}

static final class DistinctProcessor<T> extends MultiOperatorProcessor<T, T> {

final Collection<T> collection;

DistinctProcessor(MultiSubscriber<? super T> downstream) {
DistinctProcessor(MultiSubscriber<? super T> downstream, Comparator<? super T> comparator) {
super(downstream);
this.collection = new HashSet<>();
if (comparator == null) {
this.collection = new HashSet<>();
} else {
this.collection = new TreeSet<>(comparator);
}
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.ParameterValidation.nonNullNpe;

import java.util.Comparator;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
* Eliminates the duplicated items from the upstream.
*
* @param <T> the type of items
*/
public final class MultiSkipRepetitionsOp<T> extends AbstractMultiOperator<T, T> {

private final Comparator<? super T> comparator;

public MultiSkipRepetitionsOp(Multi<T> upstream) {
this(upstream, null);
}

public MultiSkipRepetitionsOp(Multi<T> upstream, Comparator<? super T> comparator) {
super(upstream);
this.comparator = comparator;
}

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
nonNullNpe(subscriber, "subscriber");
upstream.subscribe().withSubscriber(new MultiSkipRepetitionsProcessor<>(subscriber, comparator));
}

static final class MultiSkipRepetitionsProcessor<T> extends MultiOperatorProcessor<T, T> {

private final Comparator<? super T> comparator;
private T last;

public MultiSkipRepetitionsProcessor(MultiSubscriber<? super T> subscriber, Comparator<? super T> comparator) {
super(subscriber);
if (comparator == null) {
this.comparator = (a, b) -> a.equals(b) ? 0 : 1;
} else {
this.comparator = comparator;
}

}

@Override
public void onItem(T t) {
if (isDone()) {
return;
}
try {
if (last == null || comparator.compare(last, t) != 0) {
last = t;
downstream.onItem(t);
} else {
// Request the next one, as that item is dropped.
request(1);
}
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Throwable t) {
super.onFailure(t);
last = null;
}

@Override
public void onCompletion() {
super.onCompletion();
last = null;
}

@Override
public void cancel() {
super.cancel();
last = null;
}
}

}
Loading

0 comments on commit c3db8c5

Please sign in to comment.