Skip to content

Commit

Permalink
Merge pull request #575 from zsxwing/sequence-equal
Browse files Browse the repository at this point in the history
Reimplement the 'SequenceEqual' operator
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents c53a473 + b753147 commit c44d7cb
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 21 deletions.
24 changes: 14 additions & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
Expand Down Expand Up @@ -2296,31 +2297,34 @@ public static <T> Observable<T> from(Future<? extends T> future, long timeout, T
}

/**
* Returns an Observable that emits Boolean values that indicate whether the
* pairs of items emitted by two source Observables are equal.
* Returns an Observable that emits a Boolean value that indicate
* whether two sequences are equal by comparing the elements pairwise.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
*
* @param first the first Observable to compare
* @param second the second Observable to compare
* @param <T> the type of items emitted by each Observable
* @return an Observable that emits Booleans that indicate whether the
* corresponding items emitted by the source Observables are equal
* @return an Observable that emits a Boolean value that indicate
* whether two sequences are equal by comparing the elements pairwise.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
*/
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) {
return sequenceEqual(first, second, new Func2<T, T, Boolean>() {
@Override
public Boolean call(T first, T second) {
if(first == null) {
return second == null;
}
return first.equals(second);
}
});
}

/**
* Returns an Observable that emits Boolean values that indicate whether the
* pairs of items emitted by two source Observables are equal based on the
* results of a specified equality function.
* Returns an Observable that emits a Boolean value that indicate
* whether two sequences are equal by comparing the elements pairwise
* based on the results of a specified equality function.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
*
Expand All @@ -2329,12 +2333,12 @@ public Boolean call(T first, T second) {
* @param equality a function used to compare items emitted by both
* Observables
* @param <T> the type of items emitted by each Observable
* @return an Observable that emits Booleans that indicate whether the
* corresponding items emitted by the source Observables are equal
* @return an Observable that emits a Boolean value that indicate
* whether two sequences are equal by comparing the elements pairwise.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
*/
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, Func2<? super T, ? super T, Boolean> equality) {
return zip(first, second, equality);
return OperationSequenceEqual.sequenceEqual(first, second, equality);
}

/**
Expand Down
73 changes: 73 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static rx.Observable.concat;
import static rx.Observable.from;
import static rx.Observable.zip;
import rx.Notification;
import rx.Observable;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.util.functions.Functions;

/**
* Returns an Observable that emits a Boolean value that indicate whether two
* sequences are equal by comparing the elements pairwise.
*/
public class OperationSequenceEqual {

public static <T> Observable<Boolean> sequenceEqual(
Observable<? extends T> first, Observable<? extends T> second,
final Func2<? super T, ? super T, Boolean> equality) {
Observable<Notification<T>> firstObservable = concat(
first.map(new Func1<T, Notification<T>>() {

@Override
public Notification<T> call(T t1) {
return new Notification<T>(t1);
}

}), from(new Notification<T>()));

Observable<Notification<T>> secondObservable = concat(
second.map(new Func1<T, Notification<T>>() {

@Override
public Notification<T> call(T t1) {
return new Notification<T>(t1);
}

}), from(new Notification<T>()));

return zip(firstObservable, secondObservable,
new Func2<Notification<T>, Notification<T>, Boolean>() {

@Override
public Boolean call(Notification<T> t1, Notification<T> t2) {
if (t1.isOnCompleted() && t2.isOnCompleted()) {
return true;
}
if (t1.isOnCompleted() || t2.isOnCompleted()) {
return false;
}
// Now t1 and t2 must be 'onNext'.
return equality.call(t1.getValue(), t2.getValue());
}

}).all(Functions.<Boolean> identity());
}
}
11 changes: 0 additions & 11 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,6 @@ public Integer call(Integer t1, Integer t2) {
verify(w).onNext(60);
}

@Test
public void testSequenceEqual() {
Observable<Integer> first = Observable.from(1, 2, 3);
Observable<Integer> second = Observable.from(1, 2, 4);
@SuppressWarnings("unchecked")
Observer<Boolean> result = mock(Observer.class);
Observable.sequenceEqual(first, second).subscribe(result);
verify(result, times(2)).onNext(true);
verify(result, times(1)).onNext(false);
}

@Test
public void testOnSubscribeFails() {
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observer;
import rx.util.functions.Func2;

public class OperationSequenceEqualTests {

@Test
public void test1() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one", "two", "three"),
Observable.from("one", "two", "three"));
verifyResult(observable, true);
}

@Test
public void test2() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one", "two", "three"),
Observable.from("one", "two", "three", "four"));
verifyResult(observable, false);
}

@Test
public void test3() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one", "two", "three", "four"),
Observable.from("one", "two", "three"));
verifyResult(observable, false);
}

@Test
public void testWithError1() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.concat(Observable.from("one"),
Observable.<String> error(new TestException())),
Observable.from("one", "two", "three"));
verifyError(observable);
}

@Test
public void testWithError2() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one", "two", "three"),
Observable.concat(Observable.from("one"),
Observable.<String> error(new TestException())));
verifyError(observable);
}

@Test
public void testWithError3() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.concat(Observable.from("one"),
Observable.<String> error(new TestException())),
Observable.concat(Observable.from("one"),
Observable.<String> error(new TestException())));
verifyError(observable);
}

@Test
public void testWithEmpty1() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.<String> empty(),
Observable.from("one", "two", "three"));
verifyResult(observable, false);
}

@Test
public void testWithEmpty2() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one", "two", "three"),
Observable.<String> empty());
verifyResult(observable, false);
}

@Test
public void testWithEmpty3() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.<String> empty(), Observable.<String> empty());
verifyResult(observable, true);
}

@Test
public void testWithNull1() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from((String) null), Observable.from("one"));
verifyResult(observable, false);
}

@Test
public void testWithNull2() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from((String) null), Observable.from((String) null));
verifyResult(observable, true);
}

@Test
public void testWithEqualityError() {
Observable<Boolean> observable = Observable.sequenceEqual(
Observable.from("one"), Observable.from("one"),
new Func2<String, String, Boolean>() {
@Override
public Boolean call(String t1, String t2) {
throw new TestException();
}
});
verifyError(observable);
}

private void verifyResult(Observable<Boolean> observable, boolean result) {
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(result);
inOrder.verify(observer).onCompleted();
inOrder.verifyNoMoreInteractions();
}

private void verifyError(Observable<Boolean> observable) {
@SuppressWarnings("unchecked")
Observer<Boolean> observer = mock(Observer.class);
observable.subscribe(observer);

InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onError(isA(TestException.class));
inOrder.verifyNoMoreInteractions();
}

private class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
}

0 comments on commit c44d7cb

Please sign in to comment.