Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: Added MergeDelay operators for Iterable of Observables #3627

Merged
merged 1 commit into from
Feb 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,65 @@ public final static <T> Observable<T> mergeDelayError(Observable<? extends Obser
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
}

/**
* Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all
* successfully emitted items from each of the source Observables without being interrupted by an error
* notification from one of them.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Observables have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Observers once.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sequences
* the Iterable of Observables
* @return an Observable that emits items that are the result of flattening the items emitted by the
* Observables in the Iterable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Iterable<? extends Observable<? extends T>> sequences) {
return mergeDelayError(from(sequences));
}

/**
* Flattens an Iterable of Observables into one Observable, in a way that allows an Observer to receive all
* successfully emitted items from each of the source Observables without being interrupted by an error
* notification from one of them, while limiting the number of concurrent subscriptions to these Observables.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Observables have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Observers once.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sequences
* the Iterable of Observables
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits items that are the result of flattening the items emitted by the
* Observables in the Iterable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
public static <T> Observable<T> mergeDelayError(Iterable<? extends Observable<? extends T>> sequences, int maxConcurrent) {
return mergeDelayError(from(sequences), maxConcurrent);
}


/**
* Flattens two Observables into one Observable, in a way that allows an Observer to receive all
* successfully emitted items from each of the source Observables without being interrupted by an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,23 @@ public void testMergeList() {
verify(stringObserver, times(2)).onNext("hello");
}

// This is pretty much a clone of testMergeList but with the overloaded MergeDelayError for Iterables
@Test
public void mergeIterable() {
final Observable<String> o1 = Observable.create(new TestSynchronousObservable());
final Observable<String> o2 = Observable.create(new TestSynchronousObservable());
List<Observable<String>> listOfObservables = new ArrayList<Observable<String>>();
listOfObservables.add(o1);
listOfObservables.add(o2);

Observable<String> m = Observable.mergeDelayError(listOfObservables);
m.subscribe(stringObserver);

verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onCompleted();
verify(stringObserver, times(2)).onNext("hello");
}

@Test
public void testMergeArrayWithThreading() {
final TestASynchronousObservable o1 = new TestASynchronousObservable();
Expand Down Expand Up @@ -577,4 +594,4 @@ public void call(Long t1) {
assertTrue(ts.getOnErrorEvents().get(0) instanceof TestException);
assertEquals(Arrays.asList(1L, 1L, 1L), requests);
}
}
}