diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c21913f86a..667ec0db79 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,6 +36,7 @@ import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDebounce; +import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; @@ -3864,6 +3865,21 @@ public Observable firstOrDefault(Func1 predicate, T defau return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); } + /** + * Returns the elements of the specified sequence or the specified default + * value in a singleton sequence if the sequence is empty. + * + * @param defaultValue + * The value to return if the sequence is empty. + * @return An observable sequence that contains the specified default value + * if the source is empty; otherwise, the elements of the source + * itself. + * + * @see MSDN: Observable.DefaultIfEmpty + */ + public Observable defaultIfEmpty(T defaultValue) { + return create(OperationDefaultIfEmpty.defaultIfEmpty(this, defaultValue)); + } /** * Returns an Observable that emits only the first num items emitted by the source diff --git a/rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java b/rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java new file mode 100644 index 0000000000..195422dadc --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java @@ -0,0 +1,122 @@ +package rx.operators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Returns the elements of the specified sequence or the specified default value + * in a singleton sequence if the sequence is empty. + */ +public class OperationDefaultIfEmpty { + + /** + * Returns the elements of the specified sequence or the specified default + * value in a singleton sequence if the sequence is empty. + * + * @param source + * The sequence to return the specified value for if it is empty. + * @param defaultValue + * The value to return if the sequence is empty. + * @return An observable sequence that contains the specified default value + * if the source is empty; otherwise, the elements of the source + * itself. + */ + public static OnSubscribeFunc defaultIfEmpty( + Observable source, T defaultValue) { + return new DefaultIfEmpty(source, defaultValue); + } + + private static class DefaultIfEmpty implements OnSubscribeFunc { + + private final Observable source; + private final T defaultValue; + + private DefaultIfEmpty(Observable source, T defaultValue) { + this.source = source; + this.defaultValue = defaultValue; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private volatile boolean hasEmitted = false; + + @Override + public void onNext(T value) { + try { + hasEmitted = true; + observer.onNext(value); + } catch (Throwable ex) { + observer.onError(ex); + // this will work if the sequence is asynchronous, it + // will have no effect on a synchronous observable + subscription.unsubscribe(); + } + } + + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } + + @Override + public void onCompleted() { + if (hasEmitted) { + observer.onCompleted(); + } else { + observer.onNext(defaultValue); + observer.onCompleted(); + } + } + })); + } + } + + public static class UnitTest { + + @Test + public void testDefaultIfEmpty() { + Observable source = Observable.from(1, 2, 3); + Observable observable = Observable.create(defaultIfEmpty( + source, 10)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(10); + verify(aObserver, times(1)).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, times(1)).onNext(3); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testDefaultIfEmptyWithEmpty() { + Observable source = Observable.empty(); + Observable observable = Observable.create(defaultIfEmpty( + source, 10)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(10); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + } +}