Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Scala Bindings #503

Merged
merged 3 commits into from
Nov 20, 2013
Merged

New Scala Bindings #503

merged 3 commits into from
Nov 20, 2013

Conversation

headinthebox
Copy link
Contributor

Alternative Rx bindings for Scala

The previous RxScala binding attempt to optimize for seamless interop between Scala and Java.
The intended interop is illustrated by the following example where in Scala a class is defined that takes
an Observable[Movie] that is transformed using RxScala operators:

class MovieLib(val moviesStream: Observable[Movie]) {
   val threshold = 1200
   def shortMovies: Observable[Movie] = ???
   def longMovies: Observable[Movie] = ???
}

which is then called in Java, passing a Java Observable<Movie> to the constructor

public void test() {
   MovieLib lib = new MovieLib(Observable.from(...));

   lib.longMovies().subscribe(moviePrinter);
}

The technique used to obtain this transparency is to use a value class with a private constructor that implements
the Rx operators in an idiomatic Scala way, and a companion object that is used to construct instances in Scala

object Observable {
   def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = { new Observable[T](asJava) }
}

class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])  extends AnyVal {
   // Idiomatic Scala friendly definitions of Rx operators
}

Since rx.lang.scala.Observable[T] extends AnyVal, the underlying representation of rx.lang.scala.Observable[T]
is the same as rx.Observable<T>. Because rx.lang.scala.Observable[T] is an opaque type in Scala,
the Scala programmer only sees the Scala-friendly operators.

However, in the current the illusion of interop is quickly lost when going beyond this simple example.
For example but type Notification[T] and Scheduler[T] are defined using wrappers,
and hence they are not compatible with Notification<T> respectively Scheduler<T>.
For instance, when materializing an Observable[T] in Scala to an Observable[Notification[T]],
we lost the seamless interop with Observable<Notification<T>> on the Java side.

However, the real problems with seamless interop show up when we try to creating bindings for other Rx types.
In particular types that have inheritance or more structure.

For example, RxScala currently defines a type synonym type Observer[-T] = rx.Observer[_ >: T],
but no further bindings for observers.
Similarly, for subjects RxScala defines type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R].
The problem with these definitions is that on the Java side, subjects are defined as:

public abstract class Subject<T, R> extends Observable<R> implements Observer<T> { …}

without binding any of the Rx subjects.

The consequence is that Subject[S,T] in Scala is unrelated to rx.lang.scala.Observable[T] in Scala,
but shows up as a rx.Observable[T]. The problem however is that if we want to expose subjects in Scala
such that they derive from both Observable[S] and Observer[T] we cannot use the extend AnyVal trick
we used for Observable[T] and immediately lose transparent interop with Java.

The problem is even worse because AsyncSubject<T>, BehaviorSubject<T>, … all derive from Subject<T,T>,
so if we want them to derive from a common base Subject[T,T] type in Scala we lose transparency for those as well.
And again, if we expose the various subjects by extending AnyVal, they are useless in Scala because they do not inherit
from a common base type. To avoid implementing all methods of observable and observer on each specific subject
we might add implicit conversions to Observable[T] and Observer[T] but that still does not give Scala users
a native Subject[S,T] type.

object AsyncSubject {
    def apply[T](): AsyncSubject[T] =
      new AsyncSubject[T](rx.subjects.AsyncSubject.create())
}

class AsyncSubject[T] private [scala] (val inner: rx.subjects.AsyncSubject[T])
    extends AnyVal
{ … }

implicit final def asObservable[T](subject: AsyncSubject[T]): Observable[T] =
  Observable(subject.inner)

implicit final def asObserver[T](subject: AsyncSubject[T]): Observer[T] =
  subject.inner

The inheritance problem is not just limited to subjects, but also surfaces for subscriptions.
Rx scala currently defines type Subscription = rx.Subscription using a type synonym as well,
and we run into exactly the same problems as with subjects when we try to bind the
various Rx subscriptions BooleanSubscription, SerialSubscription, etc.

Since we cannot wrap Rx types in Scala such that they are both (a) transparently interoperable with Java,
and (b) feel native and idiomatic to Scala, we should decide in favor of optimizing RxScala for Scala
and consumption of Rx values from Java but not for Scala as a producer.

The new bindings feel like a completely native Scala library, without needing any complications of the Scala side.

object Observer { …}
trait Observable[+T] {
   def asJavaObservable: rx.Observable[_ <: T]
}

object Observer {…}
trait Observer[-T] {
  def asJavaObserver: rx.Observer[_ >: T]
}

object Subject {…}
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
  val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
}

object Scheduler {…}
trait Scheduler {
   def asJavaScheduler: rx.Scheduler;
}

object Notification {…}
trait Notification[+T] {
  def asJavaNotification: rx.Notification[_ <: T]
}

object Subscription {…}
trait Subscription {
   def asJavaSubscription: rx.Subscription
}

You pay the price when crossing the Scala/Java interop boundary, which is where it should be.
The proper way is to put the burden of interop on the Scala side, in case you want to create
a reusable Rx-based library in Scala, or wrap and unwrap on the Java side.

public static void main(String[] args) {

   Observable<Movie> movies = Observable.from(new Movie(3000), new Movie(1000), new Movie(2000));
   MovieLib lib = new MovieLib(toScalaObservable(movies));
   lib.longMovies().asJavaObservable().subscribe(m ->
      System.out.println("A movie of length " + m.lengthInSeconds() + "s")
   );
}

Delegation versus Inheritance

The obvious thought is that using delegation instead of inheritance (http://c2.com/cgi/wiki?DelegationIsInheritance)
will lead to excessive wrapping, since all Scala types wrap and delegate to an underlying RxJava implementation.
Note however, that the wrapping happens at query generation time and incurs no overhead when messages are flowing
through the pipeline. Say we have a query xs.map(f).filter(p).subscribe(o). Even though the Scala types are wrappers,
the callback that is registered with xs is something like x => { val y = f(x); if(p(y)){ o.asJavaObserver.onNext(y) }}
and hence there is no additional runtime penalty.

benjchristensen added a commit that referenced this pull request Nov 20, 2013
@benjchristensen benjchristensen merged commit fc3e08b into ReactiveX:master Nov 20, 2013
@cloudbees-pull-request-builder

RxJava-pull-requests #427 SUCCESS
This pull request looks good

rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
@headinthebox headinthebox deleted the scala-bindings branch January 21, 2014 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants