Skip to content

Commit

Permalink
Simplify signatures of retryWhen/repeatWhen as discussed in ReactiveX#24
Browse files Browse the repository at this point in the history


- If RxJava is updated with the same signature changes with ReactiveX/RxJava#1720, the conversion from RxScala to RxJava can be simplified, without any external signature changes in RxScala.
  • Loading branch information
jbripley committed Oct 3, 2014
1 parent a9c96b7 commit 1b95587
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 29 deletions.
41 changes: 36 additions & 5 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class RxScalaDemo extends JUnitSuite {
println(doubleAverage(Observable.empty).toBlocking.single)
println(doubleAverage(List(0.0).toObservable).toBlocking.single)
println(doubleAverage(List(4.44).toObservable).toBlocking.single)
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlocking.single)
println(doubleAverage(List(1.0, 2.0, 3.5).toObservable).toBlocking.single)
}

@Test def testSum() {
Expand Down Expand Up @@ -1132,20 +1132,51 @@ class RxScalaDemo extends JUnitSuite {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onError(new RuntimeException("always fails"))
}).retryWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
}).retryWhen({ throwableObservable =>
throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
println("delay retry by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
}).toBlocking.foreach(s => println(s))
}

@Test def retryWhenDifferentExceptionsExample(): Unit = {
var observableCreateCount = 1 // Just to support switching which Exception is produced
Observable[String]({ subscriber =>
println("subscribing")
if (observableCreateCount <= 2) {
subscriber.onError(new IOException("IO Fail"))
} else {
subscriber.onError(new RuntimeException("Other failure"))
}
observableCreateCount += 1
}).retryWhen({ throwableObservable =>
throwableObservable.zip(Observable.from(1 to 3)).flatMap({ case (error, retryCount) =>
error match {
// Only retry 2 times if we get a IOException and then error out with the third IOException.
// Let the other Exception's pass through and complete the Observable.
case _: IOException =>
if (retryCount <= 3) {
println("IOException delay retry by " + retryCount + " second(s)")
Observable.timer(Duration(retryCount, TimeUnit.SECONDS))
} else {
Observable.error(error)
}

case _ =>
println("got error " + error + ", will stop retrying")
Observable.empty
}
})
}).toBlocking.foreach(s => println(s))
}

@Test def repeatWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onCompleted()
}).repeatWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
}).repeatWhen({ unitObservable =>
unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
println("delay repeat by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
Expand Down
84 changes: 60 additions & 24 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,7 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source observable with the exception of an
* `onError`. An `onError` notification from the source will result in the emission of a
* [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* [[Throwable]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call
* `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable.
Expand All @@ -3276,54 +3276,60 @@ trait Observable[+T]
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre>
* @example
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* {{{
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onError(new RuntimeException("always fails"))
* }).retryWhen(attempts => {
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
* }).retryWhen({ throwableObservable =>
* throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
* println("delay retry by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* </pre>
* }}}
*
* Output is:
*
* <pre>
* {{{
* subscribing
* delay retry by 1 second(s)
* subscribing
* delay retry by 2 second(s)
* subscribing
* delay retry by 3 second(s)
* subscribing
* </pre>
* }}}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>`retryWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the
* retry
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an `onError`.
* An onError will emit a [[Notification]] to the observable provided as an argument to the notificationHandler
* func. If the observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
* An onError will emit a [[Throwable]] to the Observable provided as an argument to the notificationHandler
* func. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
* or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
Expand All @@ -3333,18 +3339,19 @@ trait Observable[+T]
* <dd>you specify which [[Scheduler]] this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting
* the retry
* @param scheduler the Scheduler on which to subscribe to the source Observable
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
Expand Down Expand Up @@ -3415,7 +3422,7 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable, on a particular Scheduler.
Expand All @@ -3426,18 +3433,18 @@ trait Observable[+T]
* <dd>you specify which [[Scheduler]] this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
* @param scheduler the Scheduler to emit the items on
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
notificationHandler(on.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
Expand All @@ -3446,28 +3453,57 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
*
* @example
*
* This repeats 3 times, each time incrementing the number of seconds it waits.
*
* {{{
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onCompleted()
* }).repeatWhen({ unitObservable =>
* unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
* println("delay repeat by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* }}}
*
* Output is:
*
* {{{
* subscribing
* delay repeat by 1 second(s)
* subscribing
* delay repeat by 2 second(s)
* subscribing
* delay repeat by 3 second(s)
* subscribing
* }}}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>`repeatWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
notificationHandler(on.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f))
Expand Down

0 comments on commit 1b95587

Please sign in to comment.