Skip to content

Commit

Permalink
Merge pull request ReactiveX#27 from jbripley/retryrepeatwhen-updates…
Browse files Browse the repository at this point in the history
…ignatures

Upgrade to RxJava 1.0.0-RC5
  • Loading branch information
benjchristensen committed Oct 8, 2014
2 parents 8982cff + 445a85c commit 7b1d202
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 113 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.0-rc.4",
"io.reactivex" % "rxjava" % "1.0.0-rc.5",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
55 changes: 36 additions & 19 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,20 +391,6 @@ class RxScalaDemo extends JUnitSuite {
waitFor(Olympics.yearTicks)
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.subscribe(x => println(s"Emitted group: $x"))
}

@Test def groupByUntilExample2() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) }
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
}

@Test def combineLatestExample() {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
Expand Down Expand Up @@ -554,7 +540,7 @@ class RxScalaDemo extends JUnitSuite {
println(doubleAverage(Observable.empty).toBlocking.single)
println(doubleAverage(List(0.0).toObservable).toBlocking.single)
println(doubleAverage(List(4.44).toObservable).toBlocking.single)
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlocking.single)
println(doubleAverage(List(1.0, 2.0, 3.5).toObservable).toBlocking.single)
}

@Test def testSum() {
Expand Down Expand Up @@ -1115,20 +1101,51 @@ class RxScalaDemo extends JUnitSuite {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onError(new RuntimeException("always fails"))
}).retryWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
}).retryWhen({ throwableObservable =>
throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
println("delay retry by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
}).toBlocking.foreach(s => println(s))
}

@Test def retryWhenDifferentExceptionsExample(): Unit = {
var observableCreateCount = 1 // Just to support switching which Exception is produced
Observable[String]({ subscriber =>
println("subscribing")
if (observableCreateCount <= 2) {
subscriber.onError(new IOException("IO Fail"))
} else {
subscriber.onError(new RuntimeException("Other failure"))
}
observableCreateCount += 1
}).retryWhen({ throwableObservable =>
throwableObservable.zip(Observable.from(1 to 3)).flatMap({ case (error, retryCount) =>
error match {
// Only retry 2 times if we get a IOException and then error out with the third IOException.
// Let the other Exception's pass through and complete the Observable.
case _: IOException =>
if (retryCount <= 3) {
println("IOException delay retry by " + retryCount + " second(s)")
Observable.timer(Duration(retryCount, TimeUnit.SECONDS))
} else {
Observable.error(error)
}

case _ =>
println("got error " + error + ", will stop retrying")
Observable.empty
}
})
}).toBlocking.foreach(s => println(s))
}

@Test def repeatWhenExample(): Unit = {
Observable[String]({ subscriber =>
println("subscribing")
subscriber.onCompleted()
}).repeatWhen(attempts => {
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
}).repeatWhen({ unitObservable =>
unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
println("delay repeat by " + i + " second(s)")
Observable.timer(Duration(i, TimeUnit.SECONDS))
})
Expand Down
159 changes: 72 additions & 87 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2260,57 +2260,6 @@ trait Observable[+T]
}
}

/**
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
* according to a function.
*
* @param f
* a function that extracts the key from an item
* @param closings
* the function that accepts the key of a given group and an observable representing that group, and returns
* an observable that emits a single Closing when the group should be closed.
* @tparam K
* the type of the keys returned by the discriminator function.
* @return an Observable that emits `(key, observable)` pairs, where `observable`
* contains all items for which `f` returned `key` before `closings` emits a value.
*/
def groupByUntil[K](f: T => K)(closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function
* until the duration Observable expires for the key.
*
* <img width="640" height="375" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupByUntil.png">
*
* <em>Note:</em> The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that
* do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them.
*
* @param keySelector a function to extract the key for each item
* @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one
* of the resulting `Observable[V]`s
* @param closings a function to signal the expiration of a group
* @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key
* value and each of which emits all items emitted by the source [[Observable]] during that
* key's duration that share that same key value, transformed by the value selector
*/
def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V)(closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = {
val jKeySelector: Func1[_ >: T, _ <: K] = keySelector
val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector
val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] {
override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo))
}
val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o))
val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f)
toScalaObservable[(K, Observable[V])](jo)
}

/**
* Correlates the items emitted by two Observables based on overlapping durations.
* <p>
Expand Down Expand Up @@ -3265,7 +3214,7 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source observable with the exception of an
* `onError`. An `onError` notification from the source will result in the emission of a
* [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* [[Throwable]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call
* `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable.
Expand All @@ -3276,54 +3225,60 @@ trait Observable[+T]
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre>
* @example
*
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* {{{
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onError(new RuntimeException("always fails"))
* }).retryWhen(attempts => {
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
* }).retryWhen({ throwableObservable =>
* throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
* println("delay retry by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* </pre>
* }}}
*
* Output is:
*
* <pre>
* {{{
* subscribing
* delay retry by 1 second(s)
* subscribing
* delay retry by 2 second(s)
* subscribing
* delay retry by 3 second(s)
* subscribing
* </pre>
* }}}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>`retryWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the
* retry
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
(jOt: rx.Observable[_ <: Throwable]) => {
val ot = toScalaObservable[Throwable](jOt)
notificationHandler(ot).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f))
}

/**
* Returns an Observable that emits the same values as the source observable with the exception of an `onError`.
* An onError will emit a [[Notification]] to the observable provided as an argument to the notificationHandler
* func. If the observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
* An onError will emit a [[Throwable]] to the Observable provided as an argument to the notificationHandler
* func. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
* or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
Expand All @@ -3333,18 +3288,19 @@ trait Observable[+T]
* <dd>you specify which [[Scheduler]] this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
* retry
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting
* the retry
* @param scheduler the Scheduler on which to subscribe to the source Observable
* @return the source Observable modified with retry logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
(jOt: rx.Observable[_ <: Throwable]) => {
val ot = toScalaObservable[Throwable](jOt)
notificationHandler(ot).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
Expand Down Expand Up @@ -3415,7 +3371,7 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source Observable, on a particular Scheduler.
Expand All @@ -3426,18 +3382,18 @@ trait Observable[+T]
* <dd>you specify which [[Scheduler]] this operator will use</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
* @param scheduler the Scheduler to emit the items on
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
(jOv: rx.Observable[_ <: Void]) => {
val ov = toScalaObservable[Void](jOv)
notificationHandler(ov.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
Expand All @@ -3446,28 +3402,57 @@ trait Observable[+T]
/**
* Returns an Observable that emits the same values as the source Observable with the exception of an
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
* resubscribe to the source observable.
* <p>
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
*
* @example
*
* This repeats 3 times, each time incrementing the number of seconds it waits.
*
* {{{
* Observable[String]({ subscriber =>
* println("subscribing")
* subscriber.onCompleted()
* }).repeatWhen({ unitObservable =>
* unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
* println("delay repeat by " + i + " second(s)")
* Observable.timer(Duration(i, TimeUnit.SECONDS))
* })
* }).toBlocking.foreach(s => println(s))
* }}}
*
* Output is:
*
* {{{
* subscribing
* delay repeat by 1 second(s)
* subscribing
* delay repeat by 2 second(s)
* subscribing
* delay repeat by 3 second(s)
* subscribing
* }}}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>`repeatWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
* </dl>
*
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
* @return the source Observable modified with repeat logic
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on).asJavaObservable
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
(jOv: rx.Observable[_ <: Void]) => {
val ov = toScalaObservable[Void](jOv)
notificationHandler(ov.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f))
Expand Down
Loading

0 comments on commit 7b1d202

Please sign in to comment.