Skip to content

Commit

Permalink
Merge pull request ReactiveX#398 from benjchristensen/merge-385-any
Browse files Browse the repository at this point in the history
Merge 'any' Pull Request
  • Loading branch information
benjchristensen committed Sep 21, 2013
2 parents 05689c8 + 3cd8d20 commit 8129a47
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 4 deletions.
31 changes: 30 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.operators.OperationAny;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
Expand Down Expand Up @@ -3138,6 +3139,22 @@ public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector, Com
return create(OperationDistinct.distinct(this, keySelector, equalityComparator));
}

/**
* Returns an {@link Observable} that emits <code>true</code> if any element of the source {@link Observable} satisfies
* the given condition, otherwise <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
* <p>
* In Rx.Net this is the <code>any</code> operator but renamed in RxJava to better match Java naming idioms.
*
* @param predicate
* The condition to test every element.
* @return A subscription function for creating the target Observable.
* @see <a href= "http://msdn.microsoft.com/en-us/library/hh211993(v=vs.103).aspx" >MSDN: Observable.Any</a> Note: the description in this page is
* wrong.
*/
public Observable<Boolean> exists(Func1<? super T, Boolean> predicate) {
return create(OperationAny.exists(this, predicate));
}

/**
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
* <p>
Expand Down Expand Up @@ -4319,7 +4336,19 @@ public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T,
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return create(OperationGroupBy.groupBy(this, keySelector));
}


/**
* Returns an {@link Observable} that emits <code>true</code> if the source {@link Observable} is empty, otherwise <code>false</code>.
* <p>
* In Rx.Net this is negated as the <code>any</code> operator but renamed in RxJava to better match Java naming idioms.
*
* @return A subscription function for creating the target Observable.
* @see <a href= "http://msdn.microsoft.com/en-us/library/hh229905(v=vs.103).aspx" >MSDN: Observable.Any</a>
*/
public Observable<Boolean> isEmpty() {
return create(OperationAny.isEmpty(this));
}

/**
* Converts an Observable into a {@link BlockingObservable} (an Observable with blocking
* operators).
Expand Down
286 changes: 286 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationAny.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package rx.operators;

import static org.mockito.Mockito.*;
import static rx.util.functions.Functions.*;

import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;

/**
* Returns an {@link Observable} that emits <code>true</code> if any element of
* an observable sequence satisfies a condition, otherwise <code>false</code>.
*/
public final class OperationAny {

/**
* Returns an {@link Observable} that emits <code>true</code> if the source {@link Observable} is not empty, otherwise <code>false</code>.
*
* @param source
* The source {@link Observable} to check if not empty.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<Boolean> any(Observable<? extends T> source) {
return new Any<T>(source, alwaysTrue(), false);
}

public static <T> OnSubscribeFunc<Boolean> isEmpty(Observable<? extends T> source) {
return new Any<T>(source, alwaysTrue(), true);
}

/**
* Returns an {@link Observable} that emits <code>true</code> if any element
* of the source {@link Observable} satisfies the given condition, otherwise
* <code>false</code>. Note: always emit <code>false</code> if the source {@link Observable} is empty.
*
* @param source
* The source {@link Observable} to check if any element
* satisfies the given condition.
* @param predicate
* The condition to test every element.
* @return A subscription function for creating the target Observable.
*/
public static <T> OnSubscribeFunc<Boolean> any(Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
return new Any<T>(source, predicate, false);
}

public static <T> OnSubscribeFunc<Boolean> exists(Observable<? extends T> source, Func1<? super T, Boolean> predicate) {
return any(source, predicate);
}

private static class Any<T> implements OnSubscribeFunc<Boolean> {

private final Observable<? extends T> source;
private final Func1<? super T, Boolean> predicate;
private final boolean returnOnEmpty;

private Any(Observable<? extends T> source, Func1<? super T, Boolean> predicate, boolean returnOnEmpty) {
this.source = source;
this.predicate = predicate;
this.returnOnEmpty = returnOnEmpty;
}

@Override
public Subscription onSubscribe(final Observer<? super Boolean> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private final AtomicBoolean hasEmitted = new AtomicBoolean(false);

@Override
public void onNext(T value) {
try {
if (hasEmitted.get() == false) {
if (predicate.call(value) == true
&& hasEmitted.getAndSet(true) == false) {
observer.onNext(!returnOnEmpty);
observer.onCompleted();
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}
}
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}

}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (!hasEmitted.get()) {
observer.onNext(returnOnEmpty);
observer.onCompleted();
}
}
}));
}

}

public static class UnitTest {

@Test
public void testAnyWithTwoItems() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Boolean> observable = Observable.create(any(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(false);
verify(aObserver, times(1)).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIsEmptyWithTwoItems() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Boolean> observable = Observable.create(isEmpty(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(true);
verify(aObserver, times(1)).onNext(false);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testAnyWithOneItem() {
Observable<Integer> w = Observable.from(1);
Observable<Boolean> observable = Observable.create(any(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(false);
verify(aObserver, times(1)).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIsEmptyWithOneItem() {
Observable<Integer> w = Observable.from(1);
Observable<Boolean> observable = Observable.create(isEmpty(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(true);
verify(aObserver, times(1)).onNext(false);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testAnyWithEmpty() {
Observable<Integer> w = Observable.empty();
Observable<Boolean> observable = Observable.create(any(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(false);
verify(aObserver, never()).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIsEmptyWithEmpty() {
Observable<Integer> w = Observable.empty();
Observable<Boolean> observable = Observable.create(isEmpty(w));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(true);
verify(aObserver, never()).onNext(false);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testAnyWithPredicate1() {
Observable<Integer> w = Observable.from(1, 2, 3);
Observable<Boolean> observable = Observable.create(any(w,
new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
return t1 < 2;
}
}));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(false);
verify(aObserver, times(1)).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testExists1() {
Observable<Integer> w = Observable.from(1, 2, 3);
Observable<Boolean> observable = Observable.create(exists(w,
new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
return t1 < 2;
}
}));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(false);
verify(aObserver, times(1)).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testAnyWithPredicate2() {
Observable<Integer> w = Observable.from(1, 2, 3);
Observable<Boolean> observable = Observable.create(any(w,
new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
return t1 < 1;
}
}));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(false);
verify(aObserver, never()).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testAnyWithEmptyAndPredicate() {
// If the source is empty, always output false.
Observable<Integer> w = Observable.empty();
Observable<Boolean> observable = Observable.create(any(w,
new Func1<Integer, Boolean>() {

@Override
public Boolean call(Integer t1) {
return true;
}
}));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(false);
verify(aObserver, never()).onNext(true);
verify(aObserver, never()).onError(org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}
}
}
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package rx.subjects;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.ConcurrentHashMap;
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.subjects;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package rx.subjects;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
Expand Down

0 comments on commit 8129a47

Please sign in to comment.