Skip to content

Commit

Permalink
2.x: distinctUntilChanged to store the selected key instead of the va…
Browse files Browse the repository at this point in the history
…lue (#4747)

* 2.x: distinctUntilChanged to store the selected key instead of the value

* Fix null test and whitespaces
  • Loading branch information
akarnokd authored Oct 21, 2016
1 parent a779687 commit 0d6c8e3
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 117 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.reactivex.internal.functions.*;
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.observable.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -7266,7 +7266,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> distinctUntilChanged() {
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
return distinctUntilChanged(Functions.identity());
}

/**
Expand Down Expand Up @@ -7294,7 +7294,7 @@ public final Flowable<T> distinctUntilChanged() {
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T, K>(this, keySelector, ObjectHelper.equalsPredicate()));
}

/**
Expand All @@ -7321,7 +7321,7 @@ public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySele
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer) {
ObjectHelper.requireNonNull(comparer, "comparer is null");
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T>(this, comparer));
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
}

/**
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6333,7 +6333,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Call
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinctUntilChanged() {
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
return distinctUntilChanged(Functions.identity());
}

/**
Expand All @@ -6357,7 +6357,7 @@ public final Observable<T> distinctUntilChanged() {
@SchedulerSupport(SchedulerSupport.NONE)
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, K>(this, keySelector, ObjectHelper.equalsPredicate()));
}

/**
Expand All @@ -6380,7 +6380,7 @@ public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySe
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer) {
ObjectHelper.requireNonNull(comparer, "comparer is null");
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T>(this, comparer));
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
}

/**
Expand Down
26 changes: 0 additions & 26 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -639,32 +639,6 @@ public static <T> Function<List<T>, List<T>> listSorter(final Comparator<? super
return new ListSorter<T>(comparator);
}

static final BiPredicate<Object, Object> DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity());

@SuppressWarnings("unchecked")
public static <T> BiPredicate<T, T> equalsPredicate() {
return (BiPredicate<T, T>)DEFAULT_EQUALS_PREDICATE;
}

static final class KeyedEqualsPredicate<T, K> implements BiPredicate<T, T> {
final Function<? super T, K> keySelector;

KeyedEqualsPredicate(Function<? super T, K> keySelector) {
this.keySelector = keySelector;
}

@Override
public boolean test(T t1, T t2) throws Exception {
K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key");
K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key");
return ObjectHelper.equals(k1, k2);
}
}

public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
return new KeyedEqualsPredicate<T, K>(keySelector);
}

public static final Consumer<Subscription> REQUEST_MAX = new Consumer<Subscription>() {
@Override
public void accept(Subscription t) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,49 @@

import org.reactivestreams.*;

import io.reactivex.functions.BiPredicate;
import io.reactivex.functions.*;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.subscribers.*;

public final class FlowableDistinctUntilChanged<T> extends AbstractFlowableWithUpstream<T, T> {
public final class FlowableDistinctUntilChanged<T, K> extends AbstractFlowableWithUpstream<T, T> {

final BiPredicate<? super T, ? super T> comparer;
final Function<? super T, K> keySelector;

public FlowableDistinctUntilChanged(Publisher<T> source, BiPredicate<? super T, ? super T> comparer) {
final BiPredicate<? super K, ? super K> comparer;

public FlowableDistinctUntilChanged(Publisher<T> source, Function<? super T, K> keySelector, BiPredicate<? super K, ? super K> comparer) {
super(source);
this.keySelector = keySelector;
this.comparer = comparer;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
if (s instanceof ConditionalSubscriber) {
ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s;
source.subscribe(new DistinctUntilChangedConditionalSubscriber<T>(cs, comparer));
source.subscribe(new DistinctUntilChangedConditionalSubscriber<T, K>(cs, keySelector, comparer));
} else {
source.subscribe(new DistinctUntilChangedSubscriber<T>(s, comparer));
source.subscribe(new DistinctUntilChangedSubscriber<T, K>(s, keySelector, comparer));
}
}

static final class DistinctUntilChangedSubscriber<T> extends BasicFuseableSubscriber<T, T>
static final class DistinctUntilChangedSubscriber<T, K> extends BasicFuseableSubscriber<T, T>
implements ConditionalSubscriber<T> {

final BiPredicate<? super T, ? super T> comparer;

T last;
final Function<? super T, K> keySelector;

final BiPredicate<? super K, ? super K> comparer;

K last;

boolean hasValue;

DistinctUntilChangedSubscriber(Subscriber<? super T> actual,
BiPredicate<? super T, ? super T> comparer) {
Function<? super T, K> keySelector,
BiPredicate<? super K, ? super K> comparer) {
super(actual);
this.keySelector = keySelector;
this.comparer = comparer;
}

Expand All @@ -70,23 +78,25 @@ public boolean tryOnNext(T t) {
return true;
}

if (hasValue) {
boolean equal;
try {
equal = comparer.test(last, t);
} catch (Throwable ex) {
fail(ex);
return false;
}
last = t;
if (equal) {
return false;
K key;

try {
key = keySelector.apply(t);
if (hasValue) {
boolean equal = comparer.test(last, key);
last = key;
if (equal) {
return false;
}
} else {
hasValue = true;
last = key;
}
actual.onNext(t);
return true;
} catch (Throwable ex) {
fail(ex);
return true;
}
hasValue = true;
last = t;

actual.onNext(t);
return true;
}
Expand All @@ -103,17 +113,18 @@ public T poll() throws Exception {
if (v == null) {
return null;
}
K key = keySelector.apply(v);
if (!hasValue) {
hasValue = true;
last = v;
last = key;
return v;
}

if (!comparer.test(last, v)) {
last = v;
if (!comparer.test(last, key)) {
last = key;
return v;
}
last = v;
last = key;
if (sourceMode != SYNC) {
s.request(1);
}
Expand All @@ -122,17 +133,21 @@ public T poll() throws Exception {

}

static final class DistinctUntilChangedConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
static final class DistinctUntilChangedConditionalSubscriber<T, K> extends BasicFuseableConditionalSubscriber<T, T> {

final Function<? super T, K> keySelector;

final BiPredicate<? super T, ? super T> comparer;
final BiPredicate<? super K, ? super K> comparer;

T last;
K last;

boolean hasValue;

DistinctUntilChangedConditionalSubscriber(ConditionalSubscriber<? super T> actual,
BiPredicate<? super T, ? super T> comparer) {
Function<? super T, K> keySelector,
BiPredicate<? super K, ? super K> comparer) {
super(actual);
this.keySelector = keySelector;
this.comparer = comparer;
}

Expand All @@ -152,20 +167,27 @@ public boolean tryOnNext(T t) {
return actual.tryOnNext(t);
}

if (hasValue) {
boolean equal;
try {
equal = comparer.test(last, t);
} catch (Throwable ex) {
fail(ex);
return false;
K key;

try {
key = keySelector.apply(t);
if (hasValue) {
boolean equal = comparer.test(last, key);
last = key;
if (equal) {
return false;
}
} else {
hasValue = true;
last = key;
}
last = t;
return !equal && actual.tryOnNext(t);
} catch (Throwable ex) {
fail(ex);
return true;
}
hasValue = true;
last = t;
return actual.tryOnNext(t);

actual.onNext(t);
return true;
}

@Override
Expand All @@ -180,16 +202,18 @@ public T poll() throws Exception {
if (v == null) {
return null;
}
K key = keySelector.apply(v);
if (!hasValue) {
hasValue = true;
last = v;
last = key;
return v;
}
if (!comparer.test(last, v)) {
last = v;

if (!comparer.test(last, key)) {
last = key;
return v;
}
last = v;
last = key;
if (sourceMode != SYNC) {
s.request(1);
}
Expand Down
Loading

0 comments on commit 0d6c8e3

Please sign in to comment.