Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented the 'DefaultIfEmpty' operator. See #34 #401

Merged
merged 3 commits into from
Sep 25, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
Expand Down Expand Up @@ -3864,6 +3865,21 @@ public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defau
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
}

/**
* Returns the elements of the specified sequence or the specified default
* value in a singleton sequence if the sequence is empty.
*
* @param defaultValue
* The value to return if the sequence is empty.
* @return An observable sequence that contains the specified default value
* if the source is empty; otherwise, the elements of the source
* itself.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229624(v=vs.103).aspx">MSDN: Observable.DefaultIfEmpty</a>
*/
public Observable<T> defaultIfEmpty(T defaultValue) {
return create(OperationDefaultIfEmpty.defaultIfEmpty(this, defaultValue));
}

/**
* Returns an Observable that emits only the first <code>num</code> items emitted by the source
Expand Down
122 changes: 122 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationDefaultIfEmpty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package rx.operators;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Returns the elements of the specified sequence or the specified default value
* in a singleton sequence if the sequence is empty.
*/
public class OperationDefaultIfEmpty {

/**
* Returns the elements of the specified sequence or the specified default
* value in a singleton sequence if the sequence is empty.
*
* @param source
* The sequence to return the specified value for if it is empty.
* @param defaultValue
* The value to return if the sequence is empty.
* @return An observable sequence that contains the specified default value
* if the source is empty; otherwise, the elements of the source
* itself.
*/
public static <T> OnSubscribeFunc<T> defaultIfEmpty(
Observable<? extends T> source, T defaultValue) {
return new DefaultIfEmpty<T>(source, defaultValue);
}

private static class DefaultIfEmpty<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final T defaultValue;

private DefaultIfEmpty(Observable<? extends T> source, T defaultValue) {
this.source = source;
this.defaultValue = defaultValue;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private volatile boolean hasEmitted = false;

@Override
public void onNext(T value) {
try {
hasEmitted = true;
observer.onNext(value);
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}
}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (hasEmitted) {
observer.onCompleted();
} else {
observer.onNext(defaultValue);
observer.onCompleted();
}
}
}));
}
}

public static class UnitTest {

@Test
public void testDefaultIfEmpty() {
Observable<Integer> source = Observable.from(1, 2, 3);
Observable<Integer> observable = Observable.create(defaultIfEmpty(
source, 10));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(10);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, times(1)).onNext(3);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testDefaultIfEmptyWithEmpty() {
Observable<Integer> source = Observable.empty();
Observable<Integer> observable = Observable.create(defaultIfEmpty(
source, 10));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(10);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

}
}