Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MergeWith, ConcatWith, AmbWith #1357

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3070,6 +3070,24 @@ public final static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Ob
public final Observable<Boolean> all(Func1<? super T, Boolean> predicate) {
return lift(new OperatorAll<T>(predicate));
}

/**
* Mirrors the first Observable (current or provided) that emits an item.
* <p>
* <img width="640" height="385" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/amb.png">
* <p>
* {@code amb} does not operate by default on a particular {@link Scheduler}.
*
* @param o1
* an Observable competing to react first
* @return an Observable that emits the same sequence of items as whichever of the source Observables first
* emitted an item
* @see <a href="https://github.com/Netflix/RxJava/wiki/Conditional-and-Boolean-Operators#wiki-amb">RxJava Wiki: amb()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733.aspx">MSDN: Observable.Amb</a>
*/
public final Observable<T> ambWith(Observable<? extends T> t1) {
return amb(this, t1);
}

/**
* Disguises a object of an Observable subclass as a simple Observable object. Useful for instance when you
Expand Down Expand Up @@ -3475,6 +3493,25 @@ public final R call(R state, T value) {
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return concat(map(func));
}

/**
* Returns an Observable that emits the items emitted from the current Observable, then the next, one after the other, without
* interleaving them.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/concat.png">
* <p>
* {@code concat} does not operate by default on a particular {@link Scheduler}.
*
* @param t1
* an Observable to be concatenated after the current
* @return an Observable that emits items emitted by the two source Observables, one after the other,
* without interleaving them
* @see <a href="https://github.com/Netflix/RxJava/wiki/Mathematical-and-Aggregate-Operators#wiki-concat">RxJava Wiki: concat()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.concat.aspx">MSDN: Observable.Concat</a>
*/
public final Observable<T> concatWith(Observable<? extends T> t1) {
return concat(this, t1);
}

/**
* Returns an Observable that emits a Boolean that indicates whether the source Observable emitted a
Expand Down Expand Up @@ -4767,6 +4804,26 @@ public final <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends It
return mergeMap(OperatorMergeMapPair.convertSelector(collectionSelector), resultSelector);
}

/**
* Flattens this and another Observable into a single Observable, without any transformation.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
* <p>
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
* using the {@code merge} method.
* <p>
* {@code merge} does not operate by default on a particular {@link Scheduler}.
*
* @param t1
* an Observable to be merged
* @return an Observable that emits all of the items emitted by the source Observables
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#wiki-merge">RxJava Wiki: merge()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099.aspx">MSDN: Observable.Merge</a>
*/
public final Observable<T> mergeWith(Observable<? extends T> t1) {
return merge(this, t1);
}

/**
* Returns an Observable that emits items produced by multicasting the source Observable within a selector
* function.
Expand Down
24 changes: 23 additions & 1 deletion rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import rx.Observable.OnSubscribe;

import rx.Observable.OnSubscribe;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subscriptions.BooleanSubscription;

Expand Down Expand Up @@ -993,5 +994,26 @@ public void call(StringBuilder sb, Integer v) {

assertEquals("1-2-3", value);
}

@Test
public void testMergeWith() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.just(1).mergeWith(Observable.just(2)).subscribe(ts);
ts.assertReceivedOnNext(Arrays.asList(1, 2));
}

@Test
public void testConcatWith() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.just(1).concatWith(Observable.just(2)).subscribe(ts);
ts.assertReceivedOnNext(Arrays.asList(1, 2));
}

@Test
public void testAmbWith() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.just(1).ambWith(Observable.just(2)).subscribe(ts);
ts.assertReceivedOnNext(Arrays.asList(1));
}

}