diff --git a/build.sbt b/build.sbt index 3d4f09b1..390e4c9d 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2" crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2") libraryDependencies ++= Seq( - "io.reactivex" % "rxjava" % "1.0.0-rc.4", + "io.reactivex" % "rxjava" % "1.0.0-rc.5", "org.mockito" % "mockito-core" % "1.9.5" % "test", "junit" % "junit" % "4.11" % "test", "org.scalatest" %% "scalatest" % "2.2.2" % "test") diff --git a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala index ea5b8ac1..9b1c3828 100755 --- a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -391,20 +391,6 @@ class RxScalaDemo extends JUnitSuite { waitFor(Olympics.yearTicks) } - @Test def groupByUntilExample() { - val numbers = Observable.interval(250 millis).take(14) - val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) } - val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten - sequenced.subscribe(x => println(s"Emitted group: $x")) - } - - @Test def groupByUntilExample2() { - val numbers = Observable.interval(250 millis).take(14) - val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) } - val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten - sequenced.toBlocking.foreach(x => println(s"Emitted group: $x")) - } - @Test def combineLatestExample() { val firstCounter = Observable.interval(250 millis) val secondCounter = Observable.interval(550 millis) @@ -554,7 +540,7 @@ class RxScalaDemo extends JUnitSuite { println(doubleAverage(Observable.empty).toBlocking.single) println(doubleAverage(List(0.0).toObservable).toBlocking.single) println(doubleAverage(List(4.44).toObservable).toBlocking.single) - println(doubleAverage(List(1, 2, 3.5).toObservable).toBlocking.single) + println(doubleAverage(List(1.0, 2.0, 3.5).toObservable).toBlocking.single) } @Test def testSum() { @@ -1115,20 +1101,51 @@ class RxScalaDemo extends JUnitSuite { Observable[String]({ subscriber => println("subscribing") subscriber.onError(new RuntimeException("always fails")) - }).retryWhen(attempts => { - attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => { + }).retryWhen({ throwableObservable => + throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => { println("delay retry by " + i + " second(s)") Observable.timer(Duration(i, TimeUnit.SECONDS)) }) }).toBlocking.foreach(s => println(s)) } + @Test def retryWhenDifferentExceptionsExample(): Unit = { + var observableCreateCount = 1 // Just to support switching which Exception is produced + Observable[String]({ subscriber => + println("subscribing") + if (observableCreateCount <= 2) { + subscriber.onError(new IOException("IO Fail")) + } else { + subscriber.onError(new RuntimeException("Other failure")) + } + observableCreateCount += 1 + }).retryWhen({ throwableObservable => + throwableObservable.zip(Observable.from(1 to 3)).flatMap({ case (error, retryCount) => + error match { + // Only retry 2 times if we get a IOException and then error out with the third IOException. + // Let the other Exception's pass through and complete the Observable. + case _: IOException => + if (retryCount <= 3) { + println("IOException delay retry by " + retryCount + " second(s)") + Observable.timer(Duration(retryCount, TimeUnit.SECONDS)) + } else { + Observable.error(error) + } + + case _ => + println("got error " + error + ", will stop retrying") + Observable.empty + } + }) + }).toBlocking.foreach(s => println(s)) + } + @Test def repeatWhenExample(): Unit = { Observable[String]({ subscriber => println("subscribing") subscriber.onCompleted() - }).repeatWhen(attempts => { - attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => { + }).repeatWhen({ unitObservable => + unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => { println("delay repeat by " + i + " second(s)") Observable.timer(Duration(i, TimeUnit.SECONDS)) }) diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 0c87b6d7..5c75863d 100755 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -2260,57 +2260,6 @@ trait Observable[+T] } } - /** - * Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups - * according to a function. - * - * @param f - * a function that extracts the key from an item - * @param closings - * the function that accepts the key of a given group and an observable representing that group, and returns - * an observable that emits a single Closing when the group should be closed. - * @tparam K - * the type of the keys returned by the discriminator function. - * @return an Observable that emits `(key, observable)` pairs, where `observable` - * contains all items for which `f` returned `key` before `closings` emits a value. - */ - def groupByUntil[K](f: T => K)(closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = { - val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] = - (jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable - val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]] - val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o)) - toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) - } - - /** - * Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function - * until the duration Observable expires for the key. - * - * - * - * Note: The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it - * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that - * do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them. - * - * @param keySelector a function to extract the key for each item - * @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one - * of the resulting `Observable[V]`s - * @param closings a function to signal the expiration of a group - * @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key - * value and each of which emits all items emitted by the source [[Observable]] during that - * key's duration that share that same key value, transformed by the value selector - */ - def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V)(closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = { - val jKeySelector: Func1[_ >: T, _ <: K] = keySelector - val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector - val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] { - override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo)) - } - val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o)) - val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f) - toScalaObservable[(K, Observable[V])](jo) - } - /** * Correlates the items emitted by two Observables based on overlapping durations. *

@@ -3265,7 +3214,7 @@ trait Observable[+T] /** * Returns an Observable that emits the same values as the source observable with the exception of an * `onError`. An `onError` notification from the source will result in the emission of a - * [[Notification]] to the Observable provided as an argument to the `notificationHandler` + * [[Throwable]] to the Observable provided as an argument to the `notificationHandler` * function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call * `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will * resubscribe to the source Observable. @@ -3276,21 +3225,25 @@ trait Observable[+T] * * This retries 3 times, each time incrementing the number of seconds it waits. * - *

+   * @example
+   *
+   * This retries 3 times, each time incrementing the number of seconds it waits.
+   *
+   * {{{
    * Observable[String]({ subscriber =>
    *   println("subscribing")
    *   subscriber.onError(new RuntimeException("always fails"))
-   * }).retryWhen(attempts => {
-   *   attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
+   * }).retryWhen({ throwableObservable =>
+   *   throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
    *     println("delay retry by " + i + " second(s)")
    *     Observable.timer(Duration(i, TimeUnit.SECONDS))
    *   })
    * }).toBlocking.foreach(s => println(s))
-   * 
+ * }}} * * Output is: * - *
+   * {{{
    * subscribing
    * delay retry by 1 second(s)
    * subscribing
@@ -3298,23 +3251,25 @@ trait Observable[+T]
    * subscribing
    * delay retry by 3 second(s)
    * subscribing
-   * 
+ * }}} + * *
*
Scheduler:
*
`retryWhen` operates by default on the `trampoline` [[Scheduler]].
*
* - * @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the + * @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the * retry * @return the source Observable modified with retry logic * @see RxJava Wiki: retryWhen() + * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example * @since 0.20 */ - def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] = - (jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => { - val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) }) - notificationHandler(on).asJavaObservable + def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = { + val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] = + (jOt: rx.Observable[_ <: Throwable]) => { + val ot = toScalaObservable[Throwable](jOt) + notificationHandler(ot).asJavaObservable } toScalaObservable[T](asJavaObservable.retryWhen(f)) @@ -3322,8 +3277,8 @@ trait Observable[+T] /** * Returns an Observable that emits the same values as the source observable with the exception of an `onError`. - * An onError will emit a [[Notification]] to the observable provided as an argument to the notificationHandler - * func. If the observable returned `onCompletes` or `onErrors` then retry will call `onCompleted` + * An onError will emit a [[Throwable]] to the Observable provided as an argument to the notificationHandler + * func. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted` * or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler. *

* @@ -3333,18 +3288,19 @@ trait Observable[+T] *

you specify which [[Scheduler]] this operator will use
* * - * @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the - * retry + * @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting + * the retry * @param scheduler the Scheduler on which to subscribe to the source Observable * @return the source Observable modified with retry logic * @see RxJava Wiki: retryWhen() + * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example * @since 0.20 */ - def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] = - (jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => { - val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) }) - notificationHandler(on).asJavaObservable + def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = { + val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] = + (jOt: rx.Observable[_ <: Throwable]) => { + val ot = toScalaObservable[Throwable](jOt) + notificationHandler(ot).asJavaObservable } toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler)) @@ -3415,7 +3371,7 @@ trait Observable[+T] /** * Returns an Observable that emits the same values as the source Observable with the exception of an * `onCompleted`. An `onCompleted` notification from the source will result in the emission of - * a [[Notification]] to the Observable provided as an argument to the `notificationHandler` + * a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler` * function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will * call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will * resubscribe to the source Observable, on a particular Scheduler. @@ -3426,18 +3382,18 @@ trait Observable[+T] *
you specify which [[Scheduler]] this operator will use
* * - * @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat. + * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat. * @param scheduler the Scheduler to emit the items on * @return the source Observable modified with repeat logic * @see RxJava Wiki: repeatWhen() * @see MSDN: Observable.Repeat * @since 0.20 */ - def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] = - (jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => { - val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) }) - notificationHandler(on).asJavaObservable + def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = { + val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] = + (jOv: rx.Observable[_ <: Void]) => { + val ov = toScalaObservable[Void](jOv) + notificationHandler(ov.map( _ => Unit )).asJavaObservable } toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler)) @@ -3446,28 +3402,57 @@ trait Observable[+T] /** * Returns an Observable that emits the same values as the source Observable with the exception of an * `onCompleted`. An `onCompleted` notification from the source will result in the emission of - * a [[Notification]] to the Observable provided as an argument to the `notificationHandler` + * a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler` * function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will * call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will * resubscribe to the source observable. *

* + * + * @example + * + * This repeats 3 times, each time incrementing the number of seconds it waits. + * + * {{{ + * Observable[String]({ subscriber => + * println("subscribing") + * subscriber.onCompleted() + * }).repeatWhen({ unitObservable => + * unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => { + * println("delay repeat by " + i + " second(s)") + * Observable.timer(Duration(i, TimeUnit.SECONDS)) + * }) + * }).toBlocking.foreach(s => println(s)) + * }}} + * + * Output is: + * + * {{{ + * subscribing + * delay repeat by 1 second(s) + * subscribing + * delay repeat by 2 second(s) + * subscribing + * delay repeat by 3 second(s) + * subscribing + * }}} + * *

*
Scheduler:
*
`repeatWhen` operates by default on the `trampoline` [[Scheduler]].
*
* - * @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat. + * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat. * @return the source Observable modified with repeat logic * @see RxJava Wiki: repeatWhen() * @see MSDN: Observable.Repeat * @since 0.20 */ - def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = { - val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] = - (jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => { - val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) }) - notificationHandler(on).asJavaObservable + def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = { + val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] = + (jOv: rx.Observable[_ <: Void]) => { + val ov = toScalaObservable[Void](jOv) + notificationHandler(ov.map( _ => Unit )).asJavaObservable } toScalaObservable[T](asJavaObservable.repeatWhen(f)) diff --git a/src/test/scala/rx/lang/scala/CompletenessTest.scala b/src/test/scala/rx/lang/scala/CompletenessTest.scala index 21864724..f9900c15 100644 --- a/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -102,8 +102,6 @@ class CompletenessTest extends JUnitSuite { "forEach(Action1[_ >: T])" -> "foreach(T => Unit)", "forEach(Action1[_ >: T], Action1[Throwable])" -> "foreach(T => Unit, Throwable => Unit)", "forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)", - "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K)((K, Observable[T]) => Observable[Any])", - "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V)((K, Observable[V]) => Observable[Any])", "groupJoin(Observable[T2], Func1[_ >: T, _ <: Observable[D1]], Func1[_ >: T2, _ <: Observable[D2]], Func2[_ >: T, _ >: Observable[T2], _ <: R])" -> "groupJoin(Observable[S])(T => Observable[Any], S => Observable[Any], (T, Observable[S]) => R)", "ignoreElements()" -> "[use `filter(_ => false)`]", "join(Observable[TRight], Func1[T, Observable[TLeftDuration]], Func1[TRight, Observable[TRightDuration]], Func2[T, TRight, R])" -> "join(Observable[S])(T => Observable[Any], S => Observable[Any], (T, S) => R)", @@ -128,8 +126,8 @@ class CompletenessTest extends JUnitSuite { "publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", - "repeatWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]])" -> "repeatWhen(Observable[Notification[Any]] => Observable[Any])", - "repeatWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]], Scheduler)" -> "repeatWhen(Observable[Notification[Any]] => Observable[Any], Scheduler)", + "repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]])" -> "repeatWhen(Observable[Unit] => Observable[Any])", + "repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]], Scheduler)" -> "repeatWhen(Observable[Unit] => Observable[Any], Scheduler)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "replay(Observable[T] => Observable[R])", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int)" -> "replay(Observable[T] => Observable[R], Int)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit)" -> "replay(Observable[T] => Observable[R], Int, Duration)", @@ -139,8 +137,8 @@ class CompletenessTest extends JUnitSuite { "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[T] => Observable[R], Duration, Scheduler)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[T] => Observable[R], Scheduler)", "retry(Func2[Integer, Throwable, Boolean])" -> "retry((Int, Throwable) => Boolean)", - "retryWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]], Scheduler)" -> "retryWhen(Observable[Notification[Any]] => Observable[Any], Scheduler)", - "retryWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]])" -> "retryWhen(Observable[Notification[Any]] => Observable[Any])", + "retryWhen(Func1[_ >: Observable[_ <: Throwable], _ <: Observable[_]], Scheduler)" -> "retryWhen(Observable[Throwable] => Observable[Any], Scheduler)", + "retryWhen(Func1[_ >: Observable[_ <: Throwable], _ <: Observable[_]])" -> "retryWhen(Observable[Throwable] => Observable[Any])", "sample(Observable[U])" -> "sample(Observable[Any])", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",