Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flatMap and concatMap to RxScala #1304

Merged
merged 2 commits into from
Jun 2, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1108,4 +1108,72 @@ class RxScalaDemo extends JUnitSuite {
}
o.toBlockingObservable.foreach(println(_))
}

@Test def flatMapExample() {
val o = Observable.items(10, 100)
o.flatMap(n => Observable.interval(200 millis).map(_ * n))
.take(20)
.toBlocking.foreach(println)
}

@Test def flatMapExample2() {
val o = Observable.items(10, 100)
val o1 = for (n <- o;
i <- Observable.interval(200 millis)) yield i * n
o1.take(20).toBlocking.foreach(println)
}

@Test def flatMapExample3() {
val o = Observable[Int] {
subscriber =>
subscriber.onNext(10)
subscriber.onNext(100)
subscriber.onError(new IOException("Oops"))
}
o.flatMap(
(n: Int) => Observable.interval(200 millis).map(_ * n),
e => Observable.interval(200 millis).map(_ * -1),
() => Observable.interval(200 millis).map(_ * 1000)
).take(20)
.toBlocking.foreach(println)
}

@Test def flatMapExample4() {
val o = Observable.items(10, 100)
o.flatMap(
(n: Int) => Observable.interval(200 millis).map(_ * n),
e => Observable.interval(200 millis).map(_ * -1),
() => Observable.interval(200 millis).map(_ * 1000)
).take(20)
.toBlocking.foreach(println)
}

@Test def flatMapExample5() {
val o = Observable.items(1, 10, 100, 1000)
o.flatMap(
(n: Int) => Observable.interval(200 millis).take(5),
(n: Int, m: Long) => n * m
).toBlocking.foreach(println)
}

@Test def flatMapIterableExample() {
val o = Observable.items(10, 100)
o.flatMapIterable(n => (1 to 20).map(_ * n))
.toBlocking.foreach(println)
}

@Test def flatMapIterableExample2() {
val o = Observable.items(1, 10, 100, 1000)
o.flatMapIterable(
(n: Int) => (1 to 5),
(n: Int, m: Int) => n * m
).toBlocking.foreach(println)
}

@Test def concatMapExample() {
val o = Observable.items(10, 100)
o.concatMap(n => Observable.interval(200 millis).map(_ * n).take(10))
.take(20)
.toBlocking.foreach(println)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,25 @@ trait Observable[+T]
toScalaObservable[U](o5)
}

/**
* Returns a new Observable that emits items resulting from applying a function that you supply to each item
* emitted by the source Observable, where that function returns an Observable, and then emitting the items
* that result from concatinating those resulting Observables.
*
* <img width="640" height="305" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/concatMap.png">
*
* @param f a function that, when applied to an item emitted by the source Observable, returns an Observable
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source Observable and concatinating the Observables obtained from this transformation
*/
def concatMap[R](f: T => Observable[R]): Observable[R] = {
toScalaObservable[R](asJavaObservable.concatMap[R](new Func1[T, rx.Observable[_ <: R]] {
def call(t1: T): rx.Observable[_ <: R] = {
f(t1).asJavaObservable
}
}))
}

/**
* Wraps this Observable in another Observable that ensures that the resulting
* Observable is chronologically well-behaved.
Expand Down Expand Up @@ -883,6 +902,95 @@ trait Observable[+T]
}))
}

/**
* Returns an Observable that applies a function to each item emitted or notification raised by the source
* Observable and then flattens the Observables returned from these functions and emits the resulting items.
*
* <img width="640" height="410" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.nce.png">
*
* @tparam R the result type
* @param onNext a function that returns an Observable to merge for each item emitted by the source Observable
* @param onError a function that returns an Observable to merge for an onError notification from the source
* Observable
* @param onCompleted a function that returns an Observable to merge for an onCompleted notification from the source
* Observable
* @return an Observable that emits the results of merging the Observables returned from applying the
* specified functions to the emissions and notifications of the source Observable
*/
def flatMap[R](onNext: T => Observable[R], onError: Throwable => Observable[R], onCompleted: () => Observable[R]): Observable[R] = {
val jOnNext = new Func1[T, rx.Observable[_ <: R]] {
override def call(t: T): rx.Observable[_ <: R] = onNext(t).asJavaObservable
}
val jOnError = new Func1[Throwable, rx.Observable[_ <: R]] {
override def call(e: Throwable): rx.Observable[_ <: R] = onError(e).asJavaObservable
}
val jOnCompleted = new Func0[rx.Observable[_ <: R]] {
override def call(): rx.Observable[_ <: R] = onCompleted().asJavaObservable
}
toScalaObservable[R](asJavaObservable.mergeMap[R](jOnNext, jOnError, jOnCompleted))
}

/**
* Returns an Observable that emits the results of a specified function to the pair of values emitted by the
* source Observable and a specified collection Observable.
*
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMap.r.png">
*
* @tparam U the type of items emitted by the collection Observable
* @tparam R the type of items emitted by the resulting Observable
* @param collectionSelector a function that returns an Observable for each item emitted by the source Observable
* @param resultSelector a function that combines one item emitted by each of the source and collection Observables and
* returns an item to be emitted by the resulting Observable
* @return an Observable that emits the results of applying a function to a pair of values emitted by the
* source Observable and the collection Observable
*/
def flatMap[U, R](collectionSelector: T => Observable[U], resultSelector: (T, U) => R): Observable[R] = {
val jCollectionSelector = new Func1[T, rx.Observable[_ <: U]] {
override def call(t: T): rx.Observable[_ <: U] = collectionSelector(t).asJavaObservable
}
toScalaObservable[R](asJavaObservable.mergeMap[U, R](jCollectionSelector, resultSelector))
}

/**
* Returns an Observable that merges each item emitted by the source Observable with the values in an
* Iterable corresponding to that item that is generated by a selector.
*
* <img width="640" height="310" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.png">
*
* @tparam R the type of item emitted by the resulting Observable
* @param collectionSelector a function that returns an Iterable sequence of values for when given an item emitted by the
* source Observable
* @return an Observable that emits the results of merging the items emitted by the source Observable with
* the values in the Iterables corresponding to those items, as generated by `collectionSelector
*/
def flatMapIterable[R](collectionSelector: T => Iterable[R]): Observable[R] = {
val jCollectionSelector = new Func1[T, java.lang.Iterable[_ <: R]] {
override def call(t: T): java.lang.Iterable[_ <: R] = collectionSelector(t).asJava
}
toScalaObservable[R](asJavaObservable.mergeMapIterable[R](jCollectionSelector))
}

/**
* Returns an Observable that emits the results of applying a function to the pair of values from the source
* Observable and an Iterable corresponding to that item that is generated by a selector.
*
* <img width="640" height="390" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/mergeMapIterable.r.png">
*
* @tparam U the collection element type
* @tparam R the type of item emited by the resulting Observable
* @param collectionSelector a function that returns an Iterable sequence of values for each item emitted by the source
* Observable
* @param resultSelector a function that returns an item based on the item emitted by the source Observable and the
* Iterable returned for that item by the `collectionSelector`
* @return an Observable that emits the items returned by `resultSelector` for each item in the source Observable
*/
def flatMapIterable[U, R](collectionSelector: T => Iterable[U], resultSelector: (T, U) => R): Observable[R] = {
val jCollectionSelector = new Func1[T, java.lang.Iterable[_ <: U]] {
override def call(t: T): java.lang.Iterable[_ <: U] = collectionSelector(t).asJava
}
toScalaObservable[R](asJavaObservable.mergeMapIterable[U, R](jCollectionSelector, resultSelector))
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable and emits the result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class CompletenessTest extends JUnitSuite {
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"limit(Int)" -> "take(Int)",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
"mergeMap(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
"mergeMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R], Throwable => Observable[R], () => Observable[R])",
"mergeMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMap(T => Observable[U], (T, U) => R)",
"mergeMapIterable(Func1[_ >: T, _ <: Iterable[_ <: R]])" -> "flatMapIterable(T => Iterable[R])",
"mergeMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterable(T => Iterable[U], (T, U) => R)",
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
Expand Down