Skip to content

Commit

Permalink
Merge pull request #3763 from akarnokd/CombineLatestDelayError1x
Browse files Browse the repository at this point in the history
1.x: combineLatestDelayError
  • Loading branch information
abersnaze committed Mar 14, 2016
2 parents c62472c + 4b80956 commit a42d0bf
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 2 deletions.
27 changes: 27 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,33 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<?
return create(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
}

/**
* Combines a collection of source Observables by emitting an item that aggregates the latest values of each of
* the source Observables each time an item is received from any of the source Observables, where this
* aggregation is defined by a specified function and delays any error from the sources until
* all source Observables terminate.
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T>
* the common base type of source values
* @param <R>
* the result type
* @param sources
* the collection of source Observables
* @param combineFunction
* the aggregation function used to combine the items emitted by the source Observables
* @return an Observable that emits items that are the result of combining the items emitted by the source
* Observables by means of the given aggregation function
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
*/
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return create(new OnSubscribeCombineLatest<T, R>(null, sources, combineFunction, RxRingBuffer.SIZE, true));
}

/**
* Returns an Observable that emits the items emitted by each of the Observables emitted by the source
* Observable, one after the other, without interleaving them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void combine(Object value, int index) {
if (value != null && allSourcesFinished) {
queue.offer(combinerSubscriber, latest.clone());
} else
if (value == null && error.get() != null) {
if (value == null && error.get() != null && (o == MISSING || !delayError)) {
done = true; // if this source completed without a value
}
} else {
Expand Down
105 changes: 104 additions & 1 deletion src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import rx.*;
import rx.Observable;
import rx.Observer;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -954,5 +955,107 @@ public Integer call(Object... args) {
throw new RuntimeException();
}

};
};

@SuppressWarnings("unchecked")
@Test
public void firstJustError() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.combineLatestDelayError(
Arrays.asList(Observable.just(1), Observable.<Integer>error(new TestException())),
new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return ((Integer)args[0]) + ((Integer)args[1]);
}
}
).subscribe(ts);

ts.assertNoValues();
ts.assertError(TestException.class);
ts.assertNotCompleted();
}

@SuppressWarnings("unchecked")
@Test
public void secondJustError() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.combineLatestDelayError(
Arrays.asList(Observable.<Integer>error(new TestException()), Observable.just(1)),
new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return ((Integer)args[0]) + ((Integer)args[1]);
}
}
).subscribe(ts);

ts.assertNoValues();
ts.assertError(TestException.class);
ts.assertNotCompleted();
}

@SuppressWarnings("unchecked")
@Test
public void oneErrors() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.combineLatestDelayError(
Arrays.asList(Observable.just(10).concatWith(Observable.<Integer>error(new TestException())), Observable.just(1)),
new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return ((Integer)args[0]) + ((Integer)args[1]);
}
}
).subscribe(ts);

ts.assertValues(11);
ts.assertError(TestException.class);
ts.assertNotCompleted();
}

@SuppressWarnings("unchecked")
@Test
public void twoErrors() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.combineLatestDelayError(
Arrays.asList(Observable.just(1), Observable.just(10).concatWith(Observable.<Integer>error(new TestException()))),
new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return ((Integer)args[0]) + ((Integer)args[1]);
}
}
).subscribe(ts);

ts.assertValues(11);
ts.assertError(TestException.class);
ts.assertNotCompleted();
}

@SuppressWarnings("unchecked")
@Test
public void bothError() {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.combineLatestDelayError(
Arrays.asList(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())),
Observable.just(10).concatWith(Observable.<Integer>error(new TestException()))),
new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return ((Integer)args[0]) + ((Integer)args[1]);
}
}
).subscribe(ts);

ts.assertValues(11);
ts.assertError(CompositeException.class);
ts.assertNotCompleted();
}

}

0 comments on commit a42d0bf

Please sign in to comment.