Skip to content

Commit

Permalink
Merge pull request ReactiveX#29 from zsxwing/rxjava-1701
Browse files Browse the repository at this point in the history
Merge Rxjava #1701
  • Loading branch information
benjchristensen committed Oct 7, 2014
2 parents b6cb8c4 + c718be8 commit 8982cff
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 52 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.0-rc.3",
"io.reactivex" % "rxjava" % "1.0.0-rc.4",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
17 changes: 0 additions & 17 deletions examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -689,23 +689,6 @@ class RxScalaDemo extends JUnitSuite {
x*x
}

def work(o1: Observable[Int]): Observable[String] = {
println(s"map() is being called on thread ${Thread.currentThread().getId}")
o1.map(i => s"The square of $i is ${square(i)}")
}

@Test def parallelExample() {
val t0 = System.currentTimeMillis()
Observable.from(1 to 10).parallel(work(_)).toBlocking.foreach(println(_))
println(s"Work took ${System.currentTimeMillis()-t0} ms")
}

@Test def exampleWithoutParallel() {
val t0 = System.currentTimeMillis()
work(Observable.from(1 to 10)).toBlocking.foreach(println(_))
println(s"Work took ${System.currentTimeMillis()-t0} ms")
}

@Test def toSortedList() {
assertEquals(Seq(7, 8, 9, 10), List(10, 7, 8, 9).toObservable.toSeq.map(_.sorted).toBlocking.single)
val f = (a: Int, b: Int) => b < a
Expand Down
5 changes: 5 additions & 0 deletions project/bintray.sbt
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
resolvers += Resolver.url(
"bintray-sbt-plugin-releases",
url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
Resolver.ivyStylePatterns)

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.1.2")
4 changes: 2 additions & 2 deletions src/main/scala/rx/lang/scala/JavaConversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ object JavaConversions {

implicit def toJavaTransformer[T, R](transformer: Observable[T] => Observable[R]): rx.Observable.Transformer[T, R] = {
new rx.Observable.Transformer[T, R] {
override def call(o: rx.Observable[_ <: T]): rx.Observable[R] = {
transformer(toScalaObservable(o)).asJavaObservable.asInstanceOf[rx.Observable[R]]
override def call(o: rx.Observable[_ <: T]): rx.Observable[_ <: R] = {
transformer(toScalaObservable(o)).asJavaObservable
}
}
}
Expand Down
29 changes: 0 additions & 29 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3485,35 +3485,6 @@ trait Observable[+T]
new BlockingObservable[T](this)
}

/**
* Perform work in parallel by sharding an `Observable[T]` on a
* [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output.
*
* @param f
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
* @return an Observable with the output of the function executed on a [[rx.lang.scala.Scheduler]]
*/
def parallel[R](f: Observable[T] => Observable[R]): Observable[R] = {
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
(jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava))
}

/**
* Perform work in parallel by sharding an `Observable[T]` on a [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output.
*
* @param f
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
* @param scheduler
* a [[rx.lang.scala.Scheduler]] to perform the work on.
* @return an Observable with the output of the function executed on a [[rx.lang.scala.Scheduler]]
*/
def parallel[R](f: Observable[T] => Observable[R], scheduler: Scheduler): Observable[R] = {
val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
(jo: rx.Observable[T]) => f(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
toScalaObservable(asJavaObservable.asInstanceOf[rx.Observable[T]].parallel[R](fJava, scheduler))
}

/** Tests whether a predicate holds for some of the elements of this `Observable`.
*
* @param p the predicate used to test elements.
Expand Down
4 changes: 1 addition & 3 deletions src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class CompletenessTest extends JUnitSuite {
"buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "slidingBuffer(Observable[Opening])(Opening => Observable[Any])",
"cast(Class[R])" -> "[RxJava needs this one because `rx.Observable` is invariant. `Observable` in RxScala is covariant and does not need this operator.]",
"collect(R, Action2[R, _ >: T])" -> "foldLeft(R)((R, T) => R)",
"compose(Transformer[_ >: T, R])" -> "compose(Observable[T] => Observable[R])",
"compose(Transformer[_ >: T, _ <: R])" -> "compose(Observable[T] => Observable[R])",
"concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
Expand Down Expand Up @@ -123,8 +123,6 @@ class CompletenessTest extends JUnitSuite {
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
"onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)",
"onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])",
"parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])",
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[T] => Observable[R])",
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[T] => Observable[R], T @uncheckedVariance)",
"publishLast(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publishLast(Observable[T] => Observable[R])",
Expand Down

0 comments on commit 8982cff

Please sign in to comment.