From edb1e60c693a3618735080cfe7fb63d0ee5e79cc Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Sat, 14 Sep 2013 13:57:51 +0200 Subject: [PATCH 01/11] first Scala groupBy implementation --- .../main/scala/rx/lang/scala/Observable.scala | 52 +++++-------------- 1 file changed, 13 insertions(+), 39 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 4c4cccf4a0..550f042c4b 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1215,51 +1215,25 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) // because we can just use ++ instead /** - * Groups the items emitted by an Observable according to a specified criterion, and emits these - * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. - *

- * + * Groups the items emitted by this Observable according to a specified discriminator function. * - * @param keySelector + * @param f * a function that extracts the key from an item - * @param elementSelector - * a function to map a source item to an item in a {@link GroupedObservable} - * @param - * the key type - * @param - * the type of items emitted by the resulting {@link GroupedObservable}s - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value - */ - /* TODO make a Scala GroupedObservable and groupBy - def groupBy[K,R](keySelector: T => K, elementSelector: T => R ): Observable[GroupedObservable[K,R]] = { - ??? - } - */ - // public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) - - /** - * Groups the items emitted by an Observable according to a specified criterion, and emits these - * grouped items as {@link GroupedObservable}s, one GroupedObservable per group. - *

- * - * - * @param keySelector - * a function that extracts the key for each item * @param - * the key type - * @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a - * unique key value and emits items representing items from the source Observable that - * share that key value + * the type of keys returned by the discriminator function. + * @return an Observable that emits {@code (key, observable)} pairs, where {@code observable} + * contains all items for which {@code f} returned {@code key}. */ - /* TODO - def groupBy[K](keySelector: T => K ): Observable[GroupedObservable[K,T]] = { - ??? + def groupBy[K](f: T => K): Observable[(K, Observable[T])] = { + val o1 = asJava.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]] + val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey(), Observable[T](o)) + Observable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) } - */ - // public Observable> groupBy(final Func1 keySelector) + // There's no method corresponding to + // public Observable> groupBy(final Func1 keySelector, final Func1 elementSelector) + // because this can be obtained by combining groupBy and map (as in Scala) + /** * Given an Observable that emits Observables, creates a single Observable that * emits the items emitted by the most recently published of those Observables. From 8d33df1198d385c5f1b6a38ce85f2f315fd9a632 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Sat, 14 Sep 2013 13:58:12 +0200 Subject: [PATCH 02/11] start groupBy examples --- .../rx/lang/scala/examples/Olympics.scala | 39 ++++++++++++++++ .../rx/lang/scala/examples/RxScalaDemo.scala | 45 ++++++++++++++++++- 2 files changed, 83 insertions(+), 1 deletion(-) create mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala new file mode 100644 index 0000000000..f2180cf402 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala @@ -0,0 +1,39 @@ +package rx.lang.scala.examples + +object Olympics { + case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String) + + val mountainBikeMedals = List( + Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"), + Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"), + Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"), + Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"), + Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"), + Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America"), + Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"), + Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"), + Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"), + Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland"), + Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"), + Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"), + Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"), + Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"), + Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany"), + Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"), + Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"), + Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"), + Medal(2000, "Sydney 2000", "cross-country men", "Gold", "Miguel MARTINEZ", "France"), + Medal(2000, "Sydney 2000", "cross-country men", "Silver", "Filip MEIRHAEGHE", "Belgium"), + Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland"), + Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"), + Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"), + Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"), + Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"), + Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"), + Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America") + ).reverse + +} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index d46bfe1d3b..461c280f4c 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -23,7 +23,7 @@ import org.junit.{Before, Test, Ignore} import org.junit.Assert._ import rx.lang.scala.concurrency.NewThreadScheduler -@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily +//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @Test def intervalExample() { @@ -158,6 +158,49 @@ class RxScalaDemo extends JUnitSuite { waitFor(o) } + def sampleAllUntilComplete[T](o: Observable[T], period: Duration): Observable[T] = { + for ((element, tick) <- o zip Observable.interval(period)) yield element + } + + @Ignore //TODO + @Test def groupByExample() { + import Olympics._ + // `: _*` converts list to varargs + val medals = Observable[Medal](Olympics.mountainBikeMedals : _*) + + // 1 second = 4 years :D + val medalsByYear = sampleAllUntilComplete(medals.groupBy(_.year), 1 seconds) + + /* + val t = (for ((year, medals) <- medalsByYear) yield medals).flatMap(ms => ms) + t.subscribe(println(_)) + */ + + val timedMedals = for ((year, medals) <- medalsByYear; medal <- medals) yield medal + + timedMedals.subscribe(println(_)) // doesn't print ??? + + Thread.sleep(5000) + + /* + medalsByYear.subscribe(p => println(p._1)) + + //waitFor(medalsByYear) + + val byCountry = medals.groupBy(_.country) + + def score(medals: Observable[Medal]) = medals.fold((0, 0, 0))((s, m) => (s, m.medal) match { + case ((gold, silver, bronze), "Gold") => (gold+1, silver, bronze) + case ((gold, silver, bronze), "Silver") => (gold, silver+1, bronze) + case ((gold, silver, bronze), "Bronze") => (gold, silver, bronze+1) + }) + + val scores = for ((country, medals) <- byCountry) yield (country, score(medals)) + + println(scores.toBlockingObservable.toList) + */ + } + def output(s: String): Unit = println(s) // blocks until obs has completed From ed030db911107a2c12bc0a791518656b3a72d13c Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Mon, 16 Sep 2013 13:19:20 +0200 Subject: [PATCH 03/11] update scala README --- language-adaptors/rxjava-scala/README.md | 67 +++++++++++++++++++++++- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/README.md b/language-adaptors/rxjava-scala/README.md index 470a65744e..c4ad66d0af 100644 --- a/language-adaptors/rxjava-scala/README.md +++ b/language-adaptors/rxjava-scala/README.md @@ -1,8 +1,71 @@ # Scala Adaptor for RxJava -There's an old Scala adaptor ( `rx.lang.scala.RxImplicits` with test `rx.lang.scala.RxImplicitsTest` ), which is deprecated. All other classes in `rx.lang.scala` belong to the new adaptor. +This adaptor allows to use RxJava in Scala with anonymous functions, e.g. -# Binaries +```scala +val o = Observable.interval(200 millis).take(5) +o.subscribe(n => println("n = " + n)) +Observable(1, 2, 3, 4).reduce(_ + _) +``` + +For-comprehensions are also supported: + +```scala +val first = Observable(10, 11, 12) +val second = Observable(10, 11, 12) +val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2) +``` + +Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples: + +```scala + // instead of concat: +def ++[U >: T](that: Observable[U]): Observable[U] + +// instance method instead of static: +def zip[U](that: Observable[U]): Observable[(T, U)] + +// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications: +def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U] + +// additional type parameter U with lower bound to get covariance right: +def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U] + +// curried in Scala collections, so curry fold also here: +def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] + +// using Duration instead of (long timepan, TimeUnit duration): +def sample(duration: Duration): Observable[T] + +// called skip in Java, but drop in Scala +def drop(n: Int): Observable[T] + +// there's only mapWithIndex in Java, because Java doesn't have tuples: +def zipWithIndex: Observable[(T, Int)] + +// corresponds to Java's toList: +def toSeq: Observable[Seq[T]] + +// the implicit evidence argument ensures that switch can only be called on Observables of Observables: +def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] + +// Java's from becomes apply, and we use Scala Range +def apply(range: Range): Observable[Int] + +// use Bottom type: +def never: Observable[Nothing] +``` + +Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome). + +For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala). + +Scala code using Rx should only import members from `rx.lang.scala` and below. + +Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala). + + +## Binaries Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22). From 6e7772dc9118dafa58ee0233452e27ba88ef3f17 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 11:49:17 +0200 Subject: [PATCH 04/11] make Olympics example data timed --- .../rx/lang/scala/examples/Olympics.scala | 70 ++++++++++++------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala index f2180cf402..66d74a9321 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala @@ -1,39 +1,55 @@ package rx.lang.scala.examples +import rx.lang.scala.Observable +import scala.concurrent.duration._ + object Olympics { case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String) - val mountainBikeMedals = List( - Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"), - Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"), - Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"), - Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"), - Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"), - Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America"), - Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"), - Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"), - Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"), - Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"), - Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"), - Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland"), - Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"), - Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"), - Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"), - Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"), - Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"), - Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany"), + def mountainBikeMedals: Observable[Medal] = Observable( + Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"), + Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"), + Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"), + Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"), + Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"), + Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America") + ) ++ fourYearsEmpty ++ Observable( Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"), Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"), Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"), Medal(2000, "Sydney 2000", "cross-country men", "Gold", "Miguel MARTINEZ", "France"), Medal(2000, "Sydney 2000", "cross-country men", "Silver", "Filip MEIRHAEGHE", "Belgium"), - Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland"), - Medal(1996, "Atlanta 1996", "cross-country men", "Silver", "Thomas FRISCHKNECHT", "Switzerland"), - Medal(1996, "Atlanta 1996", "cross-country men", "Bronze", "Miguel MARTINEZ", "France"), - Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"), - Medal(1996, "Atlanta 1996", "cross-country women", "Gold", "Paola PEZZO", "Italy"), - Medal(1996, "Atlanta 1996", "cross-country women", "Silver", "Alison SYDOR", "Canada"), - Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America") - ).reverse + Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland") + ) ++ fourYearsEmpty ++ Observable( + Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"), + Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"), + Medal(2004, "Athens 2004", "cross-country women", "Gold", "Gunn-Rita DAHLE", "Norway"), + Medal(2004, "Athens 2004", "cross-country women", "Silver", "Marie-Helene PREMONT", "Canada"), + Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany") + ) ++ fourYearsEmpty ++ Observable( + Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"), + Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"), + Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"), + Medal(2008, "Beijing 2008", "cross-country men", "Gold", "Julien ABSALON", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Silver", "Jean-Christophe PERAUD", "France"), + Medal(2008, "Beijing 2008", "cross-country men", "Bronze", "Nino SCHURTER", "Switzerland") + ) ++ fourYearsEmpty ++ Observable( + Medal(2012, "London 2012", "cross-country men", "Gold", "Jaroslav KULHAVY", "Czech Republic"), + Medal(2012, "London 2012", "cross-country men", "Silver", "Nino SCHURTER", "Switzerland"), + Medal(2012, "London 2012", "cross-country men", "Bronze", "Marco Aurelio FONTANA", "Italy"), + Medal(2012, "London 2012", "cross-country women", "Gold", "Julie BRESSET", "France"), + Medal(2012, "London 2012", "cross-country women", "Silver", "Sabine SPITZ", "Germany"), + Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America") + ) + + // speed it up :D + val fourYears = 4000.millis + + val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?") + + def fourYearsEmpty: Observable[Medal] = { + Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) + } } \ No newline at end of file From ff2b3828ba0cd721888c8d5c7856b8a767a78895 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 11:54:31 +0200 Subject: [PATCH 05/11] more examples/tests in RxScalaDemo --- .../rx/lang/scala/examples/RxScalaDemo.scala | 76 ++++++++++--------- 1 file changed, 41 insertions(+), 35 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index 461c280f4c..13880ce540 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -158,47 +158,53 @@ class RxScalaDemo extends JUnitSuite { waitFor(o) } - def sampleAllUntilComplete[T](o: Observable[T], period: Duration): Observable[T] = { - for ((element, tick) <- o zip Observable.interval(period)) yield element + @Test def testGroupByThenFlatMap() { + val m = Observable(1, 2, 3, 4) + val g = m.groupBy(i => i % 2) + val t = g.flatMap((p: (Int, Observable[Int])) => p._2) + assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) } - @Ignore //TODO - @Test def groupByExample() { - import Olympics._ - // `: _*` converts list to varargs - val medals = Observable[Medal](Olympics.mountainBikeMedals : _*) - - // 1 second = 4 years :D - val medalsByYear = sampleAllUntilComplete(medals.groupBy(_.year), 1 seconds) - - /* - val t = (for ((year, medals) <- medalsByYear) yield medals).flatMap(ms => ms) - t.subscribe(println(_)) - */ - - val timedMedals = for ((year, medals) <- medalsByYear; medal <- medals) yield medal - - timedMedals.subscribe(println(_)) // doesn't print ??? - - Thread.sleep(5000) - - /* - medalsByYear.subscribe(p => println(p._1)) - - //waitFor(medalsByYear) + @Test def testGroupByThenFlatMapByForComprehension() { + val m = Observable(1, 2, 3, 4) + val g = m.groupBy(i => i % 2) + val t = for ((i, o) <- g; n <- o) yield n + assertEquals(List(1, 2, 3, 4), t.toBlockingObservable.toList) + } + + @Test def testGroupByThenFlatMapByForComprehensionWithTiming() { + val m = Observable.interval(100 millis).take(4) + val g = m.groupBy(i => i % 2) + val t = for ((i, o) <- g; n <- o) yield n + assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList) + } + + @Test def groupByExampleTest() { + val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) - val byCountry = medals.groupBy(_.country) + val firstMedalOfEachCountry = + medalsByCountry.flatMap((p: (String, Observable[Olympics.Medal])) => p._2.take(1)) + + firstMedalOfEachCountry.subscribe(medal => { + println(s"${medal.country} wins its first medal in ${medal.year}") + }) - def score(medals: Observable[Medal]) = medals.fold((0, 0, 0))((s, m) => (s, m.medal) match { - case ((gold, silver, bronze), "Gold") => (gold+1, silver, bronze) - case ((gold, silver, bronze), "Silver") => (gold, silver+1, bronze) - case ((gold, silver, bronze), "Bronze") => (gold, silver, bronze+1) - }) + //waitFor(firstMedalOfEachCountry) + Thread.sleep(20000) + } + + @Ignore // TODO this test one does not terminate! + @Test def groupByExample() { + val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) - val scores = for ((country, medals) <- byCountry) yield (country, score(medals)) + val firstMedalOfEachCountry = + for ((country, medals) <- medalsByCountry; firstMedal <- medals.take(1)) yield firstMedal + + firstMedalOfEachCountry.subscribe(medal => { + println(s"${medal.country} wins its first medal in ${medal.year}") + }) - println(scores.toBlockingObservable.toList) - */ + waitFor(firstMedalOfEachCountry) } def output(s: String): Unit = println(s) From 92f6bc1f67a829c7147863aff1134ec000fbfffa Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 13:03:33 +0200 Subject: [PATCH 06/11] finish groupBy example --- .../rx/lang/scala/examples/Olympics.scala | 7 ++++++- .../rx/lang/scala/examples/RxScalaDemo.scala | 18 +----------------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala index 66d74a9321..d826aa58e8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/Olympics.scala @@ -49,7 +49,12 @@ object Olympics { val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?") def fourYearsEmpty: Observable[Medal] = { - Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) + // TODO this should return an observable which emits nothing during fourYears and then completes + // Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests + // So we don't use this: + // Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false) + // But we just return empty, which completes immediately + Observable() } } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index 13880ce540..bde8d63555 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -136,7 +136,6 @@ class RxScalaDemo extends JUnitSuite { } @Test def testTwoSubscriptionsToOneInterval() { - // TODO this does not yet work as expected! val o = Observable.interval(100 millis).take(8) o.subscribe( i => println(s"${i}a (on thread #${Thread.currentThread().getId()})") @@ -178,22 +177,7 @@ class RxScalaDemo extends JUnitSuite { val t = for ((i, o) <- g; n <- o) yield n assertEquals(List(0, 1, 2, 3), t.toBlockingObservable.toList) } - - @Test def groupByExampleTest() { - val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) - - val firstMedalOfEachCountry = - medalsByCountry.flatMap((p: (String, Observable[Olympics.Medal])) => p._2.take(1)) - - firstMedalOfEachCountry.subscribe(medal => { - println(s"${medal.country} wins its first medal in ${medal.year}") - }) - - //waitFor(firstMedalOfEachCountry) - Thread.sleep(20000) - } - - @Ignore // TODO this test one does not terminate! + @Test def groupByExample() { val medalsByCountry = Olympics.mountainBikeMedals.groupBy(medal => medal.country) From 04f69b256c5eed2f7145293d45818f71a758dcbe Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 13:11:39 +0200 Subject: [PATCH 07/11] move ImplicitFunctionConversions out of internal package --- .../scala/{internal => }/ImplicitFunctionConversions.scala | 6 +----- .../src/main/scala/rx/lang/scala/Observable.scala | 6 +++--- .../rx/lang/scala/observables/BlockingObservable.scala | 2 +- .../rx/lang/scala/observables/ConnectableObservable.scala | 2 +- 4 files changed, 6 insertions(+), 10 deletions(-) rename language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/{internal => }/ImplicitFunctionConversions.scala (97%) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala similarity index 97% rename from language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala rename to language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index e5f6e49fc6..de0268b70b 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/internal/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.lang.scala.internal +package rx.lang.scala import java.{lang => jlang} @@ -25,10 +25,6 @@ import rx.util.functions.Func2 import rx.util.functions.Func3 import rx.util.functions.Func4 import java.{lang => jlang} -import rx.Observer -import rx.Subscription -import java.{lang => jlang} -import scala.language.implicitConversions /** * These function conversions are only used by the ScalaAdapter, users of RxScala don't need them. diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 550f042c4b..6341285a60 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -38,7 +38,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) import rx.util.functions._ import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ /** * An {@link Observer} must call an Observable's {@code subscribe} method in order to @@ -1456,7 +1456,7 @@ object Observable { import rx.{Observable => JObservable} import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ private[scala] def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = { @@ -1774,7 +1774,7 @@ object Observable { // "implementation restriction: nested class is not allowed in value class. // This restriction is planned to be removed in subsequent releases." class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <: T]) { - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ def map[B](f: T => B): Observable[B] = { Observable[B](asJava.filter(p).map[B](f)) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 3b3562d652..8227caf85e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -16,7 +16,7 @@ package rx.lang.scala.observables import scala.collection.JavaConverters._ -import rx.lang.scala.internal.ImplicitFunctionConversions._ +import rx.lang.scala.ImplicitFunctionConversions._ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <: T]) extends AnyVal diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala index 7740ad043f..8530517484 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala @@ -20,6 +20,6 @@ class ConnectableObservable[+T](val asJava: rx.observables.ConnectableObservable import rx.lang.scala._ import rx.lang.scala.util._ import rx.{Observable => JObservable} - import rx.lang.scala.internal.ImplicitFunctionConversions._ + import rx.lang.scala.ImplicitFunctionConversions._ } \ No newline at end of file From 41c91fbd136b612fa5bfc018c542871acafbec14 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 13:24:17 +0200 Subject: [PATCH 08/11] add comment to ImplicitFunctionConversions and reformat it to use indent of 2 spaces, as all other files in rx.lang.scala --- .../scala/ImplicitFunctionConversions.scala | 244 ++++++++---------- 1 file changed, 111 insertions(+), 133 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala index de0268b70b..b7f13b0783 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,137 +15,115 @@ */ package rx.lang.scala - -import java.{lang => jlang} -import rx.util.functions.Action0 -import rx.util.functions.Action1 -import rx.util.functions.Func0 -import rx.util.functions.Func1 -import rx.util.functions.Func2 -import rx.util.functions.Func3 -import rx.util.functions.Func4 -import java.{lang => jlang} +import java.{ lang => jlang } +import rx.util.functions._ /** - * These function conversions are only used by the ScalaAdapter, users of RxScala don't need them. + * These function conversions convert between Scala functions and Rx Funcs and Actions. + * Most users RxScala won't need them, but they might be useful if one wants to use + * the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants + * to use a Java library taking/returning Funcs and Actions. */ object ImplicitFunctionConversions { - // code below is copied from - // https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala - - import java.{ lang => jlang } - import language.implicitConversions - - import rx.observables.BlockingObservable - import rx.util.functions._ - import rx.{Observer, Subscription} - - implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = - new rx.Observable.OnSubscribeFunc[T] { - def onSubscribe(obs: Observer[_ >: T]): Subscription = { - f(obs) - } - } - - /*implicit def scalaFunction1ToOnSubscribeFunc[T](f: Observer[_ >: T] => Subscription) = - new rx.Observable.OnSubscribeFunc[T] { - def onSubscribe(obs: Observer[_ >: T]): Subscription = { - f(obs) - } - }*/ - - /** - * Converts a by-name parameter to a Rx Func0 - */ - implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] = - new Func0[B]{ - def call(): B = param - } - - - /** - * Converts 0-arg function to Rx Action0 - */ - implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = - new Action0 { - def call(): Unit = f() - } - - /** - * Converts 1-arg function to Rx Action1 - */ - implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = - new Action1[A] { - def call(a: A): Unit = f(a) - } - - /** - * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] - */ - implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = - new Func1[A, jlang.Boolean] { - def call(a: A): jlang.Boolean = f(a).booleanValue - } - - /** - * Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean] - */ - implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] = - new Func2[A, B, jlang.Boolean] { - def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue - } - - /** - * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 - */ - implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = - new Func2[A, jlang.Integer, jlang.Boolean] { - def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue - } - - /** - * Converts a function shaped ilke compareTo into the equivalent Rx Func2 - */ - implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = - new Func2[A, A, jlang.Integer] { - def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue - } - - /* - * This implicit allows Scala code to use any exception type and still work - * with invariant Func1 interface - */ - implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = - new Func1[Exception, B] { - def call(ex: Exception): B = f(ex.asInstanceOf[A]) - } - - /** - * The following implicits convert functions of different arities into the Rx equivalents - */ - implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = - new Func0[A] { - def call(): A = f() - } - - implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] = - new Func1[A, B] { - def call(a: A): B = f(a) - } - - implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] = - new Func2[A, B, C] { - def call(a: A, b: B) = f(a, b) - } - - implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] = - new Func3[A, B, C, D] { - def call(a: A, b: B, c: C) = f(a, b, c) - } - - implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] = - new Func4[A, B, C, D, E] { - def call(a: A, b: B, c: C, d: D) = f(a, b, c, d) - } - -} \ No newline at end of file + import language.implicitConversions + + implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) = + new rx.Observable.OnSubscribeFunc[T] { + def onSubscribe(obs: Observer[_ >: T]): Subscription = { + f(obs) + } + } + + /** + * Converts a by-name parameter to a Rx Func0 + */ + implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] = + new Func0[B] { + def call(): B = param + } + + /** + * Converts 0-arg function to Rx Action0 + */ + implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 = + new Action0 { + def call(): Unit = f() + } + + /** + * Converts 1-arg function to Rx Action1 + */ + implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] = + new Action1[A] { + def call(a: A): Unit = f(a) + } + + /** + * Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean] + */ + implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] = + new Func1[A, jlang.Boolean] { + def call(a: A): jlang.Boolean = f(a).booleanValue + } + + /** + * Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean] + */ + implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] = + new Func2[A, B, jlang.Boolean] { + def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue + } + + /** + * Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2 + */ + implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] = + new Func2[A, jlang.Integer, jlang.Boolean] { + def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue + } + + /** + * Converts a function shaped ilke compareTo into the equivalent Rx Func2 + */ + implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] = + new Func2[A, A, jlang.Integer] { + def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue + } + + /** + * This implicit allows Scala code to use any exception type and still work + * with invariant Func1 interface + */ + implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] = + new Func1[Exception, B] { + def call(ex: Exception): B = f(ex.asInstanceOf[A]) + } + + /** + * The following implicits convert functions of different arities into the Rx equivalents + */ + implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] = + new Func0[A] { + def call(): A = f() + } + + implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] = + new Func1[A, B] { + def call(a: A): B = f(a) + } + + implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] = + new Func2[A, B, C] { + def call(a: A, b: B) = f(a, b) + } + + implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] = + new Func3[A, B, C, D] { + def call(a: A, b: B, c: C) = f(a, b, c) + } + + implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] = + new Func4[A, B, C, D, E] { + def call(a: A, b: B, c: C, d: D) = f(a, b, c, d) + } +} From ac26e42d1e85deac0b7bfa50c3ca3e5298493dd4 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 14:44:25 +0200 Subject: [PATCH 09/11] use Java Subject as contravariant in T and covariant in R --- rxjava-core/src/main/java/rx/Observable.java | 2 +- .../src/main/java/rx/operators/OperationMulticast.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d5d8fb8297..a14e78329c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -397,7 +397,7 @@ public Subscription subscribe(final Action1 onNext, final Action1 ConnectableObservable multicast(Subject subject) { + public ConnectableObservable multicast(Subject subject) { return OperationMulticast.multicast(this, subject); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java index e83cfdaa03..e24c24a91a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMulticast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMulticast.java @@ -27,7 +27,7 @@ import rx.subjects.Subject; public class OperationMulticast { - public static ConnectableObservable multicast(Observable source, final Subject subject) { + public static ConnectableObservable multicast(Observable source, final Subject subject) { return new MulticastConnectableObservable(source, subject); } @@ -35,11 +35,11 @@ private static class MulticastConnectableObservable extends ConnectableObs private final Object lock = new Object(); private final Observable source; - private final Subject subject; + private final Subject subject; private Subscription subscription; - public MulticastConnectableObservable(Observable source, final Subject subject) { + public MulticastConnectableObservable(Observable source, final Subject subject) { super(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { From 5efe114f8035a5617ed24acab28b63e64779c014 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 15:16:47 +0200 Subject: [PATCH 10/11] add multicast, publish, replay --- .../main/scala/rx/lang/scala/Observable.scala | 30 ++++++++------- .../rx/lang/scala/examples/RxScalaDemo.scala | 38 +++++++++++++++++++ .../observables/ConnectableObservable.scala | 25 ------------ .../main/scala/rx/lang/scala/package.scala | 6 --- .../rx/lang/scala/subjects/package.scala | 14 +++++++ 5 files changed, 69 insertions(+), 44 deletions(-) delete mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala create mode 100644 language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 6341285a60..3462c3316a 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -20,7 +20,6 @@ package rx.lang.scala import org.scalatest.junit.JUnitSuite import scala.collection.Seq import rx.lang.scala.observables.BlockingObservable -import rx.lang.scala.observables.ConnectableObservable /** @@ -38,6 +37,7 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) import rx.util.functions._ import rx.lang.scala.{Notification, Subscription, Scheduler, Observer} import rx.lang.scala.util._ + import rx.lang.scala.subjects.Subject import rx.lang.scala.ImplicitFunctionConversions._ /** @@ -132,11 +132,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) * into * @param * result type - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * push results into the specified {@link Subject} + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to push results into the specified {@link Subject} */ - // public ConnectableObservable multicast(Subject subject) TODO - + def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = { + val javaCO = asJava.multicast[R](subject) + (() => javaCO.connect(), Observable[R](javaCO)) + } /** * Returns an Observable that first emits the items emitted by this, and then the items emitted @@ -904,11 +906,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) *

* * - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer}s + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to emit items to its {@link Observer}s */ - def replay(): ConnectableObservable[T] = { - new ConnectableObservable[T](asJava.replay()) + def replay(): (() => Subscription, Observable[T]) = { + val javaCO = asJava.replay() + (() => javaCO.connect(), Observable[T](javaCO)) } /** @@ -937,11 +940,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T]) *

* * - * @return a {@link ConnectableObservable} that upon connection causes the source Observable to - * emit items to its {@link Observer}s + * @return a pair of a start function and an {@link Observable} such that when the start function + * is called, the Observable starts to emit items to its {@link Observer}s */ - def publish: ConnectableObservable[T] = { - new ConnectableObservable[T](asJava.publish()) + def publish: (() => Subscription, Observable[T]) = { + val javaCO = asJava.publish() + (() => javaCO.connect(), Observable[T](javaCO)) } // There is no aggregate function with signature diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index bde8d63555..2900f52a86 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -191,6 +191,44 @@ class RxScalaDemo extends JUnitSuite { waitFor(firstMedalOfEachCountry) } + @Test def exampleWithoutPublish() { + val unshared = Observable(1 to 4) + unshared.subscribe(n => println(s"subscriber 1 gets $n")) + unshared.subscribe(n => println(s"subscriber 2 gets $n")) + } + + @Test def exampleWithPublish() { + val unshared = Observable(1 to 4) + val (startFunc, shared) = unshared.publish + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + startFunc() + } + + def doLater(waitTime: Duration, action: () => Unit): Unit = { + Observable.interval(waitTime).take(1).subscribe(_ => action()) + } + + @Test def exampleWithoutReplay() { + val numbers = Observable.interval(1000 millis).take(6) + val (startFunc, sharedNumbers) = numbers.publish + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + startFunc() + // subscriber 2 misses 0, 1, 2! + doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) + waitFor(sharedNumbers) + } + + @Test def exampleWithReplay() { + val numbers = Observable.interval(1000 millis).take(6) + val (startFunc, sharedNumbers) = numbers.replay + sharedNumbers.subscribe(n => println(s"subscriber 1 gets $n")) + startFunc() + // subscriber 2 subscribes later but still gets all numbers + doLater(3500 millis, () => { sharedNumbers.subscribe(n => println(s"subscriber 2 gets $n")) }) + waitFor(sharedNumbers) + } + def output(s: String): Unit = println(s) // blocks until obs has completed diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala deleted file mode 100644 index 8530517484..0000000000 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.lang.scala.observables - - -class ConnectableObservable[+T](val asJava: rx.observables.ConnectableObservable[_ <: T]) extends AnyVal { - import rx.lang.scala._ - import rx.lang.scala.util._ - import rx.{Observable => JObservable} - import rx.lang.scala.ImplicitFunctionConversions._ - -} \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala index 6910599783..0f6ea79d34 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/package.scala @@ -53,12 +53,6 @@ rx.plugins.RxJavaErrorHandler rx.plugins.RxJavaObservableExecutionHook rx.plugins.RxJavaPlugins -rx.subjects.AsyncSubject -rx.subjects.BehaviorSubject -rx.subjects.PublishSubject -rx.subjects.ReplaySubject -rx.subjects.Subject - rx.subscriptions.BooleanSubscription rx.subscriptions.CompositeSubscription rx.subscriptions.Subscriptions diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala new file mode 100644 index 0000000000..8f99d02bf6 --- /dev/null +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subjects/package.scala @@ -0,0 +1,14 @@ +package rx.lang.scala + +package object subjects { + + // in Java: public abstract class Subject extends Observable implements Observer + type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R] + + // TODO (including static methods of these classes) + // rx.subjects.AsyncSubject + // rx.subjects.BehaviorSubject + // rx.subjects.PublishSubject + // rx.subjects.ReplaySubject + +} \ No newline at end of file From f54857a4ddb866b846cb5af32702d100070cf43c Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Tue, 17 Sep 2013 16:57:34 +0200 Subject: [PATCH 11/11] add methods to BlockingObservable --- .../main/scala/rx/lang/scala/Observable.scala | 2 +- .../rx/lang/scala/examples/RxScalaDemo.scala | 10 +- .../observables/BlockingObservable.scala | 107 ++++++++++++++---- 3 files changed, 92 insertions(+), 27 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 3462c3316a..d26602bada 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1830,7 +1830,7 @@ class UnitTestSuite extends JUnitSuite { @Test def testTest() = { val a: Observable[Int] = Observable() - assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.last) + assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last) } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala index 2900f52a86..6030329b26 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -23,7 +23,7 @@ import org.junit.{Before, Test, Ignore} import org.junit.Assert._ import rx.lang.scala.concurrency.NewThreadScheduler -//@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily +@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { @Test def intervalExample() { @@ -229,11 +229,17 @@ class RxScalaDemo extends JUnitSuite { waitFor(sharedNumbers) } + @Test def testSingleOption() { + assertEquals(None, Observable(1, 2).toBlockingObservable.singleOption) + assertEquals(Some(1), Observable(1) .toBlockingObservable.singleOption) + assertEquals(None, Observable() .toBlockingObservable.singleOption) + } + def output(s: String): Unit = println(s) // blocks until obs has completed def waitFor[T](obs: Observable[T]): Unit = { - obs.toBlockingObservable.last + obs.toBlockingObservable.toIterable.last } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala index 8227caf85e..5470f6f1cb 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala @@ -22,40 +22,99 @@ class BlockingObservable[+T](val asJava: rx.observables.BlockingObservable[_ <: extends AnyVal { + /** + * Invoke a method on each item emitted by the {@link Observable}; block until the Observable + * completes. + *

+ * NOTE: This will block even if the Observable is asynchronous. + *

+ * This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does + * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods. + *

+ * + * + * @param onNext + * the {@link Action1} to invoke for every item emitted by the {@link Observable} + * @throws RuntimeException + * if an error occurs + */ def foreach(f: T => Unit): Unit = { - asJava.forEach(f) + asJava.forEach(f); } - def last: T = { - asJava.last() : T // useless ascription because of compiler bug + // last -> use toIterable.last + // lastOrDefault -> use toIterable.lastOption + // first -> use toIterable.head + // firstOrDefault -> use toIterable.headOption + // single(predicate) -> use filter and single + // singleOrDefault -> use singleOption + + /** + * Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}. + *

+ * + * + * @param initialValue + * the initial value that will be yielded by the {@link Iterable} sequence if the {@link Observable} has not yet emitted an item + * @return an {@link Iterable} that on each iteration returns the item that the {@link Observable} has most recently emitted + */ + def mostRecent[U >: T](initialValue: U): Iterable[U] = { + val asJavaU = asJava.asInstanceOf[rx.observables.BlockingObservable[U]] + asJavaU.mostRecent(initialValue).asScala: Iterable[U] // useless ascription because of compiler bug } - // last(Func1) - // lastOrDefault(T) - // lastOrDefault(T, Func1) - // mostRecent(T) - // next() + /** + * Returns an {@link Iterable} that blocks until the {@link Observable} emits another item, + * then returns that item. + *

+ * + * + * @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} emits a new item, whereupon the Iterable returns that item + */ + def next: Iterable[T] = { + asJava.next().asScala: Iterable[T] // useless ascription because of compiler bug + } + /** + * If this {@link Observable} completes after emitting a single item, return that item, + * otherwise throw an exception. + *

+ * + * + * @return the single item emitted by the {@link Observable} + */ def single: T = { - asJava.single() : T // useless ascription because of compiler bug + asJava.single(): T // useless ascription because of compiler bug + } + + /** + * If this {@link Observable} completes after emitting a single item, return an Option containing + * this item, otherwise return {@code None}. + */ + def singleOption: Option[T] = { + var size: Int = 0 + var last: Option[T] = None + for (t <- toIterable) { + size += 1 + last = Some(t) + } + if (size == 1) last else None } - - // single(Func1) - - // def singleOption: Option[T] = { TODO } - // corresponds to Java's - // singleOrDefault(T) - - // singleOrDefault(BlockingObservable, boolean, T) - // singleOrDefault(T, Func1) - // toFuture() - + + // TODO toFuture() + + /** + * Returns an {@link Iterator} that iterates over all items emitted by this {@link Observable}. + */ def toIterable: Iterable[T] = { - asJava.toIterable().asScala : Iterable[T] // useless ascription because of compiler bug + asJava.toIterable().asScala: Iterable[T] // useless ascription because of compiler bug } - + + /** + * Returns a {@link List} that contains all items emitted by this {@link Observable}. + */ def toList: List[T] = { - asJava.toIterable().asScala.toList : List[T] // useless ascription because of compiler bug + asJava.toIterable().asScala.toList: List[T] // useless ascription because of compiler bug } -} \ No newline at end of file +}