Skip to content

Commit

Permalink
Merge pull request ReactiveX#401 from zsxwing/default_if_empty
Browse files Browse the repository at this point in the history
Implemented the 'DefaultIfEmpty' operator. See ReactiveX#34
  • Loading branch information
benjchristensen committed Sep 25, 2013
2 parents d612337 + 941939c commit 7239e57
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 0 deletions.
16 changes: 16 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
Expand Down Expand Up @@ -3885,6 +3886,21 @@ public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defau
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
}

/**
* Returns the elements of the specified sequence or the specified default
* value in a singleton sequence if the sequence is empty.
*
* @param defaultValue
* The value to return if the sequence is empty.
* @return An observable sequence that contains the specified default value
* if the source is empty; otherwise, the elements of the source
* itself.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229624(v=vs.103).aspx">MSDN: Observable.DefaultIfEmpty</a>
*/
public Observable<T> defaultIfEmpty(T defaultValue) {
return create(OperationDefaultIfEmpty.defaultIfEmpty(this, defaultValue));
}

/**
* Returns an Observable that emits only the first <code>num</code> items emitted by the source
Expand Down
122 changes: 122 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package rx.operators;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Returns the elements of the specified sequence or the specified default value
* in a singleton sequence if the sequence is empty.
*/
public class OperationDefaultIfEmpty {

/**
* Returns the elements of the specified sequence or the specified default
* value in a singleton sequence if the sequence is empty.
*
* @param source
* The sequence to return the specified value for if it is empty.
* @param defaultValue
* The value to return if the sequence is empty.
* @return An observable sequence that contains the specified default value
* if the source is empty; otherwise, the elements of the source
* itself.
*/
public static <T> OnSubscribeFunc<T> defaultIfEmpty(
Observable<? extends T> source, T defaultValue) {
return new DefaultIfEmpty<T>(source, defaultValue);
}

private static class DefaultIfEmpty<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final T defaultValue;

private DefaultIfEmpty(Observable<? extends T> source, T defaultValue) {
this.source = source;
this.defaultValue = defaultValue;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private volatile boolean hasEmitted = false;

@Override
public void onNext(T value) {
try {
hasEmitted = true;
observer.onNext(value);
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}
}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (hasEmitted) {
observer.onCompleted();
} else {
observer.onNext(defaultValue);
observer.onCompleted();
}
}
}));
}
}

public static class UnitTest {

@Test
public void testDefaultIfEmpty() {
Observable<Integer> source = Observable.from(1, 2, 3);
Observable<Integer> observable = Observable.create(defaultIfEmpty(
source, 10));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(10);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, times(1)).onNext(3);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testDefaultIfEmptyWithEmpty() {
Observable<Integer> source = Observable.empty();
Observable<Integer> observable = Observable.create(defaultIfEmpty(
source, 10));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(10);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

}
}

0 comments on commit 7239e57

Please sign in to comment.