diff --git a/build.sbt b/build.sbt index 4f031db8..3d4f09b1 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2" crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2") libraryDependencies ++= Seq( - "io.reactivex" % "rxjava" % "1.0.0-rc.3", + "io.reactivex" % "rxjava" % "1.0.0-rc.4", "org.mockito" % "mockito-core" % "1.9.5" % "test", "junit" % "junit" % "4.11" % "test", "org.scalatest" %% "scalatest" % "2.2.2" % "test") diff --git a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala index 59c16983..ea5b8ac1 100755 --- a/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -689,23 +689,6 @@ class RxScalaDemo extends JUnitSuite { x*x } - def work(o1: Observable[Int]): Observable[String] = { - println(s"map() is being called on thread ${Thread.currentThread().getId}") - o1.map(i => s"The square of $i is ${square(i)}") - } - - @Test def parallelExample() { - val t0 = System.currentTimeMillis() - Observable.from(1 to 10).parallel(work(_)).toBlocking.foreach(println(_)) - println(s"Work took ${System.currentTimeMillis()-t0} ms") - } - - @Test def exampleWithoutParallel() { - val t0 = System.currentTimeMillis() - work(Observable.from(1 to 10)).toBlocking.foreach(println(_)) - println(s"Work took ${System.currentTimeMillis()-t0} ms") - } - @Test def toSortedList() { assertEquals(Seq(7, 8, 9, 10), List(10, 7, 8, 9).toObservable.toSeq.map(_.sorted).toBlocking.single) val f = (a: Int, b: Int) => b < a diff --git a/project/bintray.sbt b/project/bintray.sbt index cc152603..a3f6304e 100644 --- a/project/bintray.sbt +++ b/project/bintray.sbt @@ -1 +1,6 @@ +resolvers += Resolver.url( + "bintray-sbt-plugin-releases", + url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))( + Resolver.ivyStylePatterns) + addSbtPlugin("me.lessis" % "bintray-sbt" % "0.1.2") diff --git a/src/main/scala/rx/lang/scala/JavaConversions.scala b/src/main/scala/rx/lang/scala/JavaConversions.scala index ac54121e..180b1721 100644 --- a/src/main/scala/rx/lang/scala/JavaConversions.scala +++ b/src/main/scala/rx/lang/scala/JavaConversions.scala @@ -65,8 +65,8 @@ object JavaConversions { implicit def toJavaTransformer[T, R](transformer: Observable[T] => Observable[R]): rx.Observable.Transformer[T, R] = { new rx.Observable.Transformer[T, R] { - override def call(o: rx.Observable[_ <: T]): rx.Observable[R] = { - transformer(toScalaObservable(o)).asJavaObservable.asInstanceOf[rx.Observable[R]] + override def call(o: rx.Observable[_ <: T]): rx.Observable[_ <: R] = { + transformer(toScalaObservable(o)).asJavaObservable } } } diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 999e165e..0c87b6d7 100755 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -3485,35 +3485,6 @@ trait Observable[+T] new BlockingObservable[T](this) } - /** - * Perform work in parallel by sharding an `Observable[T]` on a - * [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output. - * - * @param f - * a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]` - * @return an Observable with the output of the function executed on a [[rx.lang.scala.Scheduler]] - */ - def parallel[R](f: Observable[T] => Observable[R]): Observable[R] = { - val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava)) - } - - /** - * Perform work in parallel by sharding an `Observable[T]` on a [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output. - * - * @param f - * a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]` - * @param scheduler - * a [[rx.lang.scala.Scheduler]] to perform the work on. - * @return an Observable with the output of the function executed on a [[rx.lang.scala.Scheduler]] - */ - def parallel[R](f: Observable[T] => Observable[R], scheduler: Scheduler): Observable[R] = { - val fJava: Func1[rx.Observable[T], rx.Observable[R]] = - (jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] - toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler)) - } - /** Tests whether a predicate holds for some of the elements of this `Observable`. * * @param p the predicate used to test elements. diff --git a/src/test/scala/rx/lang/scala/CompletenessTest.scala b/src/test/scala/rx/lang/scala/CompletenessTest.scala index 66842355..21864724 100644 --- a/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -80,7 +80,7 @@ class CompletenessTest extends JUnitSuite { "buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "slidingBuffer(Observable[Opening])(Opening => Observable[Any])", "cast(Class[R])" -> "[RxJava needs this one because `rx.Observable` is invariant. `Observable` in RxScala is covariant and does not need this operator.]", "collect(R, Action2[R, _ >: T])" -> "foldLeft(R)((R, T) => R)", - "compose(Transformer[_ >: T, R])" -> "compose(Observable[T] => Observable[R])", + "compose(Transformer[_ >: T, _ <: R])" -> "compose(Observable[T] => Observable[R])", "concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]", "contains(Any)" -> "contains(U)", "count()" -> "length", @@ -123,8 +123,6 @@ class CompletenessTest extends JUnitSuite { "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", - "parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])", - "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", "publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[T] => Observable[R])", "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[T] => Observable[R], T @uncheckedVariance)", "publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])",