Skip to content

Commit

Permalink
Upgrade to RxJava 1.0.0-RC5
Browse files Browse the repository at this point in the history
- Update retryWhen/repeatWhen RxJava signature conversion to match

- Remove groupByUntil methods since they were removed from RxJava
  • Loading branch information
jbripley committed Oct 7, 2014
1 parent f684043 commit 445a85c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 88 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.0-rc.4",
"io.reactivex" % "rxjava" % "1.0.0-rc.5",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
14 changes: 0 additions & 14 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,20 +391,6 @@ class RxScalaDemo extends JUnitSuite {
waitFor(Olympics.yearTicks)
}

@Test def groupByUntilExample() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2){ case (key, obs) => obs.filter(x => x == 7) }
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.subscribe(x => println(s"Emitted group: $x"))
}

@Test def groupByUntilExample2() {
val numbers = Observable.interval(250 millis).take(14)
val grouped = numbers.groupByUntil(x => x % 2, x => x * 10){ case (key, obs) => Observable.interval(2 seconds) }
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
sequenced.toBlocking.foreach(x => println(s"Emitted group: $x"))
}

@Test def combineLatestExample() {
val firstCounter = Observable.interval(250 millis)
val secondCounter = Observable.interval(550 millis)
Expand Down
83 changes: 16 additions & 67 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2260,57 +2260,6 @@ trait Observable[+T]
}
}

/**
* Groups the items emitted by this Observable according to a specified discriminator function and terminates these groups
* according to a function.
*
* @param f
* a function that extracts the key from an item
* @param closings
* the function that accepts the key of a given group and an observable representing that group, and returns
* an observable that emits a single Closing when the group should be closed.
* @tparam K
* the type of the keys returned by the discriminator function.
* @return an Observable that emits `(key, observable)` pairs, where `observable`
* contains all items for which `f` returned `key` before `closings` emits a value.
*/
def groupByUntil[K](f: T => K)(closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}

/**
* Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function
* until the duration Observable expires for the key.
*
* <img width="640" height="375" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/groupByUntil.png">
*
* <em>Note:</em> The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that
* do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them.
*
* @param keySelector a function to extract the key for each item
* @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one
* of the resulting `Observable[V]`s
* @param closings a function to signal the expiration of a group
* @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key
* value and each of which emits all items emitted by the source [[Observable]] during that
* key's duration that share that same key value, transformed by the value selector
*/
def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V)(closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = {
val jKeySelector: Func1[_ >: T, _ <: K] = keySelector
val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector
val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] {
override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo))
}
val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o))
val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f)
toScalaObservable[(K, Observable[V])](jo)
}

/**
* Correlates the items emitted by two Observables based on overlapping durations.
* <p>
Expand Down Expand Up @@ -3317,10 +3266,10 @@ trait Observable[+T]
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
(jOt: rx.Observable[_ <: Throwable]) => {
val ot = toScalaObservable[Throwable](jOt)
notificationHandler(ot).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f))
Expand Down Expand Up @@ -3348,10 +3297,10 @@ trait Observable[+T]
* @since 0.20
*/
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
(jOt: rx.Observable[_ <: Throwable]) => {
val ot = toScalaObservable[Throwable](jOt)
notificationHandler(ot).asJavaObservable
}

toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
Expand Down Expand Up @@ -3441,10 +3390,10 @@ trait Observable[+T]
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on.map( _ => Unit )).asJavaObservable
val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
(jOv: rx.Observable[_ <: Void]) => {
val ov = toScalaObservable[Void](jOv)
notificationHandler(ov.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
Expand Down Expand Up @@ -3500,10 +3449,10 @@ trait Observable[+T]
* @since 0.20
*/
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = {
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
notificationHandler(on.map( _ => Unit )).asJavaObservable
val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
(jOv: rx.Observable[_ <: Void]) => {
val ov = toScalaObservable[Void](jOv)
notificationHandler(ov.map( _ => Unit )).asJavaObservable
}

toScalaObservable[T](asJavaObservable.repeatWhen(f))
Expand Down
10 changes: 4 additions & 6 deletions src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ class CompletenessTest extends JUnitSuite {
"forEach(Action1[_ >: T])" -> "foreach(T => Unit)",
"forEach(Action1[_ >: T], Action1[Throwable])" -> "foreach(T => Unit, Throwable => Unit)",
"forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K)((K, Observable[T]) => Observable[Any])",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V)((K, Observable[V]) => Observable[Any])",
"groupJoin(Observable[T2], Func1[_ >: T, _ <: Observable[D1]], Func1[_ >: T2, _ <: Observable[D2]], Func2[_ >: T, _ >: Observable[T2], _ <: R])" -> "groupJoin(Observable[S])(T => Observable[Any], S => Observable[Any], (T, Observable[S]) => R)",
"ignoreElements()" -> "[use `filter(_ => false)`]",
"join(Observable[TRight], Func1[T, Observable[TLeftDuration]], Func1[TRight, Observable[TRightDuration]], Func2[T, TRight, R])" -> "join(Observable[S])(T => Observable[Any], S => Observable[Any], (T, S) => R)",
Expand All @@ -128,8 +126,8 @@ class CompletenessTest extends JUnitSuite {
"publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"repeatWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]])" -> "repeatWhen(Observable[Notification[Any]] => Observable[Any])",
"repeatWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]], Scheduler)" -> "repeatWhen(Observable[Notification[Any]] => Observable[Any], Scheduler)",
"repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]])" -> "repeatWhen(Observable[Unit] => Observable[Any])",
"repeatWhen(Func1[_ >: Observable[_ <: Void], _ <: Observable[_]], Scheduler)" -> "repeatWhen(Observable[Unit] => Observable[Any], Scheduler)",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "replay(Observable[T] => Observable[R])",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int)" -> "replay(Observable[T] => Observable[R], Int)",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Int, Long, TimeUnit)" -> "replay(Observable[T] => Observable[R], Int, Duration)",
Expand All @@ -139,8 +137,8 @@ class CompletenessTest extends JUnitSuite {
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[T] => Observable[R], Duration, Scheduler)",
"replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[T] => Observable[R], Scheduler)",
"retry(Func2[Integer, Throwable, Boolean])" -> "retry((Int, Throwable) => Boolean)",
"retryWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]], Scheduler)" -> "retryWhen(Observable[Notification[Any]] => Observable[Any], Scheduler)",
"retryWhen(Func1[_ >: Observable[_ <: Notification[_]], _ <: Observable[_]])" -> "retryWhen(Observable[Notification[Any]] => Observable[Any])",
"retryWhen(Func1[_ >: Observable[_ <: Throwable], _ <: Observable[_]], Scheduler)" -> "retryWhen(Observable[Throwable] => Observable[Any], Scheduler)",
"retryWhen(Func1[_ >: Observable[_ <: Throwable], _ <: Observable[_]])" -> "retryWhen(Observable[Throwable] => Observable[Any])",
"sample(Observable[U])" -> "sample(Observable[Any])",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
Expand Down

0 comments on commit 445a85c

Please sign in to comment.