From 35d8005a59314a4912845f3df4d50d2759303230 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 6 Dec 2013 19:46:34 +0800 Subject: [PATCH 1/2] Reimplement the 'SequenceEqual' operator using other operators --- rxjava-core/src/main/java/rx/Observable.java | 21 +-- .../rx/operators/OperationSequenceEqual.java | 73 +++++++++ .../src/test/java/rx/ObservableTests.java | 11 -- .../OperationSequenceEqualTests.java | 145 ++++++++++++++++++ 4 files changed, 229 insertions(+), 21 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9482a5f09f..fb1669efa4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -76,6 +76,7 @@ import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; +import rx.operators.OperationSequenceEqual; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; @@ -2298,16 +2299,16 @@ public static Observable from(Future future, long timeout, T } /** - * Returns an Observable that emits Boolean values that indicate whether the - * pairs of items emitted by two source Observables are equal. + * Returns an Observable that emits a Boolean value that indicate + * whether two sequences are equal by comparing the elements pairwise. *

* * * @param first the first Observable to compare * @param second the second Observable to compare * @param the type of items emitted by each Observable - * @return an Observable that emits Booleans that indicate whether the - * corresponding items emitted by the source Observables are equal + * @return an Observable that emits a Boolean value that indicate + * whether two sequences are equal by comparing the elements pairwise. * @see RxJava Wiki: sequenceEqual() */ public static Observable sequenceEqual(Observable first, Observable second) { @@ -2320,9 +2321,9 @@ public Boolean call(T first, T second) { } /** - * Returns an Observable that emits Boolean values that indicate whether the - * pairs of items emitted by two source Observables are equal based on the - * results of a specified equality function. + * Returns an Observable that emits a Boolean value that indicate + * whether two sequences are equal by comparing the elements pairwise + * based on the results of a specified equality function. *

* * @@ -2331,12 +2332,12 @@ public Boolean call(T first, T second) { * @param equality a function used to compare items emitted by both * Observables * @param the type of items emitted by each Observable - * @return an Observable that emits Booleans that indicate whether the - * corresponding items emitted by the source Observables are equal + * @return an Observable that emits a Boolean value that indicate + * whether two sequences are equal by comparing the elements pairwise. * @see RxJava Wiki: sequenceEqual() */ public static Observable sequenceEqual(Observable first, Observable second, Func2 equality) { - return zip(first, second, equality); + return OperationSequenceEqual.sequenceEqual(first, second, equality); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java b/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java new file mode 100644 index 0000000000..423ddbc358 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSequenceEqual.java @@ -0,0 +1,73 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static rx.Observable.concat; +import static rx.Observable.from; +import static rx.Observable.zip; +import rx.Notification; +import rx.Observable; +import rx.util.functions.Func1; +import rx.util.functions.Func2; +import rx.util.functions.Functions; + +/** + * Returns an Observable that emits a Boolean value that indicate whether two + * sequences are equal by comparing the elements pairwise. + */ +public class OperationSequenceEqual { + + public static Observable sequenceEqual( + Observable first, Observable second, + final Func2 equality) { + Observable> firstObservable = concat( + first.map(new Func1>() { + + @Override + public Notification call(T t1) { + return new Notification(t1); + } + + }), from(new Notification())); + + Observable> secondObservable = concat( + second.map(new Func1>() { + + @Override + public Notification call(T t1) { + return new Notification(t1); + } + + }), from(new Notification())); + + return zip(firstObservable, secondObservable, + new Func2, Notification, Boolean>() { + + @Override + public Boolean call(Notification t1, Notification t2) { + if (t1.isOnCompleted() && t2.isOnCompleted()) { + return true; + } + if (t1.isOnCompleted() || t2.isOnCompleted()) { + return false; + } + // Now t1 and t2 must be 'onNext'. + return equality.call(t1.getValue(), t2.getValue()); + } + + }).all(Functions. identity()); + } +} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 3d718210a1..e4023f44ab 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -275,17 +275,6 @@ public Integer call(Integer t1, Integer t2) { verify(w).onNext(60); } - @Test - public void testSequenceEqual() { - Observable first = Observable.from(1, 2, 3); - Observable second = Observable.from(1, 2, 4); - @SuppressWarnings("unchecked") - Observer result = mock(Observer.class); - Observable.sequenceEqual(first, second).subscribe(result); - verify(result, times(2)).onNext(true); - verify(result, times(1)).onNext(false); - } - @Test public void testOnSubscribeFails() { @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java b/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java new file mode 100644 index 0000000000..a3cc8f1521 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java @@ -0,0 +1,145 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; + +public class OperationSequenceEqualTests { + + @Test + public void test1() { + Observable observable = Observable.sequenceEqual( + Observable.from("one", "two", "three"), + Observable.from("one", "two", "three")); + verifyResult(observable, true); + } + + @Test + public void test2() { + Observable observable = Observable.sequenceEqual( + Observable.from("one", "two", "three"), + Observable.from("one", "two", "three", "four")); + verifyResult(observable, false); + } + + @Test + public void test3() { + Observable observable = Observable.sequenceEqual( + Observable.from("one", "two", "three", "four"), + Observable.from("one", "two", "three")); + verifyResult(observable, false); + } + + @Test + public void testWithError1() { + Observable observable = Observable.sequenceEqual( + Observable.concat(Observable.from("one"), + Observable. error(new TestException())), + Observable.from("one", "two", "three")); + verifyError(observable); + } + + @Test + public void testWithError2() { + Observable observable = Observable.sequenceEqual( + Observable.from("one", "two", "three"), + Observable.concat(Observable.from("one"), + Observable. error(new TestException()))); + verifyError(observable); + } + + @Test + public void testWithError3() { + Observable observable = Observable.sequenceEqual( + Observable.concat(Observable.from("one"), + Observable. error(new TestException())), + Observable.concat(Observable.from("one"), + Observable. error(new TestException()))); + verifyError(observable); + } + + @Test + public void testWithEmpty1() { + Observable observable = Observable.sequenceEqual( + Observable. empty(), + Observable.from("one", "two", "three")); + verifyResult(observable, false); + } + + @Test + public void testWithEmpty2() { + Observable observable = Observable.sequenceEqual( + Observable.from("one", "two", "three"), + Observable. empty()); + verifyResult(observable, false); + } + + @Test + public void testWithEmpty3() { + Observable observable = Observable.sequenceEqual( + Observable. empty(), Observable. empty()); + verifyResult(observable, true); + } + + @Test + public void testWithEqualityError() { + Observable observable = Observable.sequenceEqual( + Observable.from((String) null), Observable.from("one")); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(NullPointerException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void verifyResult(Observable observable, boolean result) { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(result); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + private void verifyError(Observable observable) { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError(isA(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } +} From b753147c631841a4492fa5432e1f2ff86cc8a6ab Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 7 Dec 2013 23:01:17 +0800 Subject: [PATCH 2/2] Fix the 'null' issue in the default equality --- rxjava-core/src/main/java/rx/Observable.java | 3 ++ .../OperationSequenceEqualTests.java | 29 ++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index fb1669efa4..ff304d42e4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2315,6 +2315,9 @@ public static Observable sequenceEqual(Observable firs return sequenceEqual(first, second, new Func2() { @Override public Boolean call(T first, T second) { + if(first == null) { + return second == null; + } return first.equals(second); } }); diff --git a/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java b/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java index a3cc8f1521..71aa64130c 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSequenceEqualTests.java @@ -25,6 +25,7 @@ import rx.Observable; import rx.Observer; +import rx.util.functions.Func2; public class OperationSequenceEqualTests { @@ -104,18 +105,30 @@ public void testWithEmpty3() { } @Test - public void testWithEqualityError() { + public void testWithNull1() { Observable observable = Observable.sequenceEqual( Observable.from((String) null), Observable.from("one")); + verifyResult(observable, false); + } - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - observable.subscribe(observer); + @Test + public void testWithNull2() { + Observable observable = Observable.sequenceEqual( + Observable.from((String) null), Observable.from((String) null)); + verifyResult(observable, true); + } - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onError( - isA(NullPointerException.class)); - inOrder.verifyNoMoreInteractions(); + @Test + public void testWithEqualityError() { + Observable observable = Observable.sequenceEqual( + Observable.from("one"), Observable.from("one"), + new Func2() { + @Override + public Boolean call(String t1, String t2) { + throw new TestException(); + } + }); + verifyError(observable); } private void verifyResult(Observable observable, boolean result) {