Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Adaptor Improvements #389

Merged
merged 12 commits into from
Sep 18, 2013
67 changes: 65 additions & 2 deletions language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,71 @@
# Scala Adaptor for RxJava

There's an old Scala adaptor ( `rx.lang.scala.RxImplicits` with test `rx.lang.scala.RxImplicitsTest` ), which is deprecated. All other classes in `rx.lang.scala` belong to the new adaptor.
This adaptor allows to use RxJava in Scala with anonymous functions, e.g.

# Binaries
```scala
val o = Observable.interval(200 millis).take(5)
o.subscribe(n => println("n = " + n))
Observable(1, 2, 3, 4).reduce(_ + _)
```

For-comprehensions are also supported:

```scala
val first = Observable(10, 11, 12)
val second = Observable(10, 11, 12)
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
```

Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples:

```scala
// instead of concat:
def ++[U >: T](that: Observable[U]): Observable[U]

// instance method instead of static:
def zip[U](that: Observable[U]): Observable[(T, U)]

// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications:
def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U]

// additional type parameter U with lower bound to get covariance right:
def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U]

// curried in Scala collections, so curry fold also here:
def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R]

// using Duration instead of (long timepan, TimeUnit duration):
def sample(duration: Duration): Observable[T]

// called skip in Java, but drop in Scala
def drop(n: Int): Observable[T]

// there's only mapWithIndex in Java, because Java doesn't have tuples:
def zipWithIndex: Observable[(T, Int)]

// corresponds to Java's toList:
def toSeq: Observable[Seq[T]]

// the implicit evidence argument ensures that switch can only be called on Observables of Observables:
def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U]

// Java's from becomes apply, and we use Scala Range
def apply(range: Range): Observable[Int]

// use Bottom type:
def never: Observable[Nothing]
```

Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome).

For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/examples/RxScalaDemo.scala).

Scala code using Rx should only import members from `rx.lang.scala` and below.

Work on this adaptor is still in progress, and for the moment, the best source of documentation are the comments in the source code of [`rx.lang.scala.Observable`](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala).


## Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22).

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

import java.{ lang => jlang }
import rx.util.functions._

/**
* These function conversions convert between Scala functions and Rx Funcs and Actions.
* Most users RxScala won't need them, but they might be useful if one wants to use
* the rx.Observable directly instead of using rx.lang.scala.Observable or if one wants
* to use a Java library taking/returning Funcs and Actions.
*/
object ImplicitFunctionConversions {
import language.implicitConversions

implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: Observer[_ >: T]): Subscription = {
f(obs)
}
}

/**
* Converts a by-name parameter to a Rx Func0
*/
implicit def scalaByNameParamToFunc0[B](param: => B): Func0[B] =
new Func0[B] {
def call(): B = param
}

/**
* Converts 0-arg function to Rx Action0
*/
implicit def scalaFunction0ProducingUnitToAction0(f: (() => Unit)): Action0 =
new Action0 {
def call(): Unit = f()
}

/**
* Converts 1-arg function to Rx Action1
*/
implicit def scalaFunction1ProducingUnitToAction1[A](f: (A => Unit)): Action1[A] =
new Action1[A] {
def call(a: A): Unit = f(a)
}

/**
* Converts 1-arg predicate to Rx Func1[A, java.lang.Boolean]
*/
implicit def scalaBooleanFunction1ToRxBooleanFunc1[A](f: (A => Boolean)): Func1[A, jlang.Boolean] =
new Func1[A, jlang.Boolean] {
def call(a: A): jlang.Boolean = f(a).booleanValue
}

/**
* Converts 2-arg predicate to Rx Func2[A, B, java.lang.Boolean]
*/
implicit def scalaBooleanFunction2ToRxBooleanFunc1[A, B](f: ((A, B) => Boolean)): Func2[A, B, jlang.Boolean] =
new Func2[A, B, jlang.Boolean] {
def call(a: A, b: B): jlang.Boolean = f(a, b).booleanValue
}

/**
* Converts a specific function shape (used in takeWhile) to the equivalent Java types with an Rx Func2
*/
implicit def convertTakeWhileFuncToRxFunc2[A](f: (A, Int) => Boolean): Func2[A, jlang.Integer, jlang.Boolean] =
new Func2[A, jlang.Integer, jlang.Boolean] {
def call(a: A, b: jlang.Integer): jlang.Boolean = f(a, b).booleanValue
}

/**
* Converts a function shaped ilke compareTo into the equivalent Rx Func2
*/
implicit def convertComparisonFuncToRxFunc2[A](f: (A, A) => Int): Func2[A, A, jlang.Integer] =
new Func2[A, A, jlang.Integer] {
def call(a1: A, a2: A): jlang.Integer = f(a1, a2).intValue
}

/**
* This implicit allows Scala code to use any exception type and still work
* with invariant Func1 interface
*/
implicit def exceptionFunction1ToRxExceptionFunc1[A <: Exception, B](f: (A => B)): Func1[Exception, B] =
new Func1[Exception, B] {
def call(ex: Exception): B = f(ex.asInstanceOf[A])
}

/**
* The following implicits convert functions of different arities into the Rx equivalents
*/
implicit def scalaFunction0ToRxFunc0[A](f: () => A): Func0[A] =
new Func0[A] {
def call(): A = f()
}

implicit def scalaFunction1ToRxFunc1[A, B](f: (A => B)): Func1[A, B] =
new Func1[A, B] {
def call(a: A): B = f(a)
}

implicit def scalaFunction2ToRxFunc2[A, B, C](f: (A, B) => C): Func2[A, B, C] =
new Func2[A, B, C] {
def call(a: A, b: B) = f(a, b)
}

implicit def scalaFunction3ToRxFunc3[A, B, C, D](f: (A, B, C) => D): Func3[A, B, C, D] =
new Func3[A, B, C, D] {
def call(a: A, b: B, c: C) = f(a, b, c)
}

implicit def scalaFunction4ToRxFunc4[A, B, C, D, E](f: (A, B, C, D) => E): Func4[A, B, C, D, E] =
new Func4[A, B, C, D, E] {
def call(a: A, b: B, c: C, d: D) = f(a, b, c, d)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package rx.lang.scala
import org.scalatest.junit.JUnitSuite
import scala.collection.Seq
import rx.lang.scala.observables.BlockingObservable
import rx.lang.scala.observables.ConnectableObservable


/**
Expand All @@ -38,7 +37,8 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
import rx.util.functions._
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
import rx.lang.scala.util._
import rx.lang.scala.internal.ImplicitFunctionConversions._
import rx.lang.scala.subjects.Subject
import rx.lang.scala.ImplicitFunctionConversions._

/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to
Expand Down Expand Up @@ -132,11 +132,13 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* into
* @param <R>
* result type
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
* push results into the specified {@link Subject}
* @return a pair of a start function and an {@link Observable} such that when the start function
* is called, the Observable starts to push results into the specified {@link Subject}
*/
// public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) TODO

def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = {
val javaCO = asJava.multicast[R](subject)
(() => javaCO.connect(), Observable[R](javaCO))
}

/**
* Returns an Observable that first emits the items emitted by this, and then the items emitted
Expand Down Expand Up @@ -904,11 +906,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/replay.png">
*
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
* emit items to its {@link Observer}s
* @return a pair of a start function and an {@link Observable} such that when the start function
* is called, the Observable starts to emit items to its {@link Observer}s
*/
def replay(): ConnectableObservable[T] = {
new ConnectableObservable[T](asJava.replay())
def replay(): (() => Subscription, Observable[T]) = {
val javaCO = asJava.replay()
(() => javaCO.connect(), Observable[T](javaCO))
}

/**
Expand Down Expand Up @@ -937,11 +940,12 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
*
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to
* emit items to its {@link Observer}s
* @return a pair of a start function and an {@link Observable} such that when the start function
* is called, the Observable starts to emit items to its {@link Observer}s
*/
def publish: ConnectableObservable[T] = {
new ConnectableObservable[T](asJava.publish())
def publish: (() => Subscription, Observable[T]) = {
val javaCO = asJava.publish()
(() => javaCO.connect(), Observable[T](javaCO))
}

// There is no aggregate function with signature
Expand Down Expand Up @@ -1215,51 +1219,25 @@ class Observable[+T](val asJava: rx.Observable[_ <: T])
// because we can just use ++ instead

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/groupBy.png">
* Groups the items emitted by this Observable according to a specified discriminator function.
*
* @param keySelector
* @param f
* a function that extracts the key from an item
* @param elementSelector
* a function to map a source item to an item in a {@link GroupedObservable}
* @param <K>
* the key type
* @param <R>
* the type of items emitted by the resulting {@link GroupedObservable}s
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and emits items representing items from the source Observable that
* share that key value
*/
/* TODO make a Scala GroupedObservable and groupBy
def groupBy[K,R](keySelector: T => K, elementSelector: T => R ): Observable[GroupedObservable[K,R]] = {
???
}
*/
// public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector)

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s, one GroupedObservable per group.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/groupBy.png">
*
* @param keySelector
* a function that extracts the key for each item
* @param <K>
* the key type
* @return an Observable that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and emits items representing items from the source Observable that
* share that key value
* the type of keys returned by the discriminator function.
* @return an Observable that emits {@code (key, observable)} pairs, where {@code observable}
* contains all items for which {@code f} returned {@code key}.
*/
/* TODO
def groupBy[K](keySelector: T => K ): Observable[GroupedObservable[K,T]] = {
???
def groupBy[K](f: T => K): Observable[(K, Observable[T])] = {
val o1 = asJava.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey(), Observable[T](o))
Observable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
}
*/
// public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector)

// There's no method corresponding to
// public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector)
// because this can be obtained by combining groupBy and map (as in Scala)

/**
* Given an Observable that emits Observables, creates a single Observable that
* emits the items emitted by the most recently published of those Observables.
Expand Down Expand Up @@ -1482,7 +1460,7 @@ object Observable {
import rx.{Observable => JObservable}
import rx.lang.scala.{Notification, Subscription, Scheduler, Observer}
import rx.lang.scala.util._
import rx.lang.scala.internal.ImplicitFunctionConversions._
import rx.lang.scala.ImplicitFunctionConversions._

private[scala]
def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
Expand Down Expand Up @@ -1800,7 +1778,7 @@ object Observable {
// "implementation restriction: nested class is not allowed in value class.
// This restriction is planned to be removed in subsequent releases."
class WithFilter[+T] private[scala] (p: T => Boolean, asJava: rx.Observable[_ <: T]) {
import rx.lang.scala.internal.ImplicitFunctionConversions._
import rx.lang.scala.ImplicitFunctionConversions._

def map[B](f: T => B): Observable[B] = {
Observable[B](asJava.filter(p).map[B](f))
Expand Down Expand Up @@ -1852,7 +1830,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testTest() = {
val a: Observable[Int] = Observable()
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.last)
assertEquals(4, Observable(1, 2, 3, 4).toBlockingObservable.toIterable.last)
}

}
Loading