Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala Adaptor: Inheritance, subscriptions and subjects #490

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
import rx.Observable;
import rx.util.functions.Action1;


public class MovieLibUsage {

Action1<Movie> moviePrinter = new Action1<Movie>() {
public void call(Movie m) {
System.out.println("A movie of length " + m.lengthInSeconds() + "s");
}
};

@Test
public void test() {
// TODO bindings backwards
/*
MovieLib lib = new MovieLib(Observable.from(new Movie(3000), new Movie(1000), new Movie(2000)));

lib.longMovies().subscribe(moviePrinter);

lib.longMovies().subscribe(moviePrinter);
*/
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class RxScalaDemo extends JUnitSuite {
waitFor(firstMedalOfEachCountry)
}

@Ignore // TODO this test never terminates
@Test def olympicsExample() {
val (go, medals) = Olympics.mountainBikeMedals.publish
medals.subscribe(println(_))
Expand Down Expand Up @@ -383,7 +384,8 @@ class RxScalaDemo extends JUnitSuite {
}
}

@Test def materializeExample1() {
@Test(expected = classOf[RuntimeException])
def materializeExample1() {
def printObservable[T](o: Observable[T]): Unit = {
import Notification._
o.materialize.subscribe(n => n match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,21 @@ import rx.util.functions._
object ImplicitFunctionConversions {
import language.implicitConversions

implicit def javaSubscriptionToScalaSubscription(s: rx.Subscription): Subscription = {
Subscription(s)
}

implicit def scalaSubscriptionToJavaSubscription(s: Subscription): rx.Subscription = {
s.asJava
}

implicit def scalaObserverToJavaObserver[T](o: Observer[T]): rx.Observer[_ >: T] = {
o.asJava
}

implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
new Func2[rx.Scheduler, T, Subscription] {
def call(s: rx.Scheduler, t: T): Subscription = {
new Func2[rx.Scheduler, T, rx.Subscription] {
def call(s: rx.Scheduler, t: T): rx.Subscription = {
action(s, t)
}
}
Expand All @@ -41,7 +53,7 @@ object ImplicitFunctionConversions {
implicit def scalaFunction1ToOnSubscribeFunc[T](f: rx.lang.scala.Observer[T] => Subscription) =
new rx.Observable.OnSubscribeFunc[T] {
def onSubscribe(obs: rx.Observer[_ >: T]): rx.Subscription = {
f(obs)
f(Observer(obs)).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package rx.lang.scala
/**
* Emitted by Observables returned by [[Observable.materialize]].
*/
sealed trait Notification[+T] {
def asJava: rx.Notification[_ <: T]
}
// sealed because all its subclasses must be defined in this file
sealed trait Notification[+T] extends JavaWrapper[rx.Notification[_ <: T]] {}

/**
* Provides pattern matching support and constructors for Notifications.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ package rx.lang.scala
*
*
*/
// constructor is private because users should use apply in companion
class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
// Uncommenting this line combined with `new Observable(...)` instead of `new Observable[T](...)`
// makes the compiler crash
extends AnyVal
{
trait Observable[+T] extends JavaWrapper[rx.Observable[_ <: T]] {

import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.concurrent.duration.{Duration, TimeUnit}
Expand Down Expand Up @@ -187,7 +183,7 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
* is called, the Observable starts to push results into the specified Subject
*/
def multicast[R](subject: Subject[T, R]): (() => Subscription, Observable[R]) = {
val javaCO = asJava.multicast[R](subject)
val javaCO = asJava.multicast[R](subject.asJava)
(() => javaCO.connect(), Observable[R](javaCO))
}

Expand Down Expand Up @@ -508,7 +504,7 @@ class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T])
def window(closings: () => Observable[Closing]): Observable[Observable[T]] = {
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJava
val o1: rx.Observable[_ <: rx.Observable[_]] = asJava.window(func)
val o2 = new Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val o2 = Observable[rx.Observable[_]](o1).map((x: rx.Observable[_]) => {
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
Observable[T](x2)
})
Expand Down Expand Up @@ -1842,21 +1838,23 @@ object Observable {

private[scala]
def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
val oScala1: Observable[java.util.List[T]] = new Observable[java.util.List[T]](jObs)
val oScala1: Observable[java.util.List[T]] = Observable[java.util.List[T]](jObs)
oScala1.map((lJava: java.util.List[T]) => lJava.asScala)
}

private[scala]
def jObsOfJObsToScObsOfScObs[T](jObs: rx.Observable[_ <: rx.Observable[_ <: T]]): Observable[Observable[T]] = {
val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]](jObs)
oScala1.map((oJava: rx.Observable[_ <: T]) => new Observable[T](oJava))
val oScala1: Observable[rx.Observable[_ <: T]] = Observable[rx.Observable[_ <: T]](jObs)
oScala1.map((oJava: rx.Observable[_ <: T]) => Observable[T](oJava))
}

private[Observable] class ObservableWrapper[+T](val asJava: rx.Observable[_ <: T]) extends Observable[T] {}

/**
* Creates a new Scala Observable from a given Java Observable.
*/
def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = {
new Observable[T](asJava)
new ObservableWrapper[T](asJava)
}

/**
Expand Down Expand Up @@ -2030,7 +2028,7 @@ object Observable {
* @return An Observable that emits a number each time interval.
*/
def interval(duration: Duration): Observable[Long] = {
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit))).map(_.longValue())
(Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit))).map(_.longValue())
}

/**
Expand All @@ -2045,7 +2043,7 @@ object Observable {
* @return An Observable that emits a number each time interval.
*/
def interval(duration: Duration, scheduler: Scheduler): Observable[Long] = {
(new Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit, scheduler))).map(_.longValue())
(Observable[java.lang.Long](JObservable.interval(duration.length, duration.unit, scheduler))).map(_.longValue())
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rx.lang.scala

/**
* Provides a mechanism for receiving push-based notifications.
*
* After an Observer calls an [[Observable]]'s `subscribe` method, the Observable
* calls the Observer's `onNext` method to provide notifications. A well-behaved Observable will
* call an Observer's `onCompleted` method exactly once or the Observer's `onError` method exactly once.
*/
trait Observer[-T] extends JavaWrapper[rx.Observer[_ >: T]] {

/**
* Notifies the Observer that the [[Observable]] has finished sending push-based notifications.
*
* The [[Observable]] will not call this method if it calls `onError`.
*/
def onCompleted(): Unit = {
asJava.onCompleted()
}

/**
* Notifies the Observer that the [[Observable]] has experienced an error condition.
*
* If the [[Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
*/
def onError(e: Throwable): Unit = {
asJava.onError(e)
}

/**
* Provides the Observer with new data.
*
* The [[Observable]] calls this closure 0 or more times.
*
* The [[Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
*/
def onNext(arg: T): Unit = {
asJava.onNext(arg)
}
}

object Observer {
private[Observer] class ObserverWrapper[-T](val asJava: rx.Observer[_ >: T]) extends Observer[T]

def apply[T](asJava: rx.Observer[_ >: T]): Observer[T] = {
new ObserverWrapper(asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.scalatest.junit.JUnitSuite

import rx.lang.scala.ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0
import rx.lang.scala.ImplicitFunctionConversions.schedulerActionToFunc2
import rx.lang.scala.ImplicitFunctionConversions._
import rx.lang.scala.concurrency.TestScheduler


/**
* Represents an object that schedules units of work.
*/
trait Scheduler {
def asJava: rx.Scheduler
trait Scheduler extends JavaWrapper[rx.Scheduler] {

/**
* Schedules a cancelable action to be executed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx.lang.scala

/**
* Subscriptions are returned from all `Observable.subscribe` methods to allow unsubscribing.
*
* This interface is the equivalent of `IDisposable` in the .NET Rx implementation.
*/
trait Subscription extends JavaWrapper[rx.Subscription] {

/**
* Call this method to stop receiving notifications on the Observer that was registered when
* this Subscription was received.
*/
def unsubscribe(): Unit = {
asJava.unsubscribe()
}
}

object Subscription {
private[Subscription] class SubscriptionWrapper(val asJava: rx.Subscription) extends Subscription {}

def apply(asJava: rx.Subscription): Subscription = {
// no need to care if it's a subclass of rx.Subscription
new SubscriptionWrapper(asJava)
}

private[Subscription] class SubscriptionFromFunc(unsubscribe: => Unit) extends Subscription {
val asJava: rx.Subscription = rx.subscriptions.Subscriptions.create(
ImplicitFunctionConversions.scalaFunction0ProducingUnitToAction0(unsubscribe))
}

/**
* Creates an [[rx.lang.scala.Subscription]] that invokes the specified action when unsubscribed.
*/
def apply(u: => Unit): Subscription = {
new SubscriptionFromFunc(u)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ private class UnitTest extends JUnitSuite {
import org.mockito.Mockito._

val scheduler = TestScheduler()
// a Java observer
val observer = mock(classOf[rx.Observer[Long]])

val o = Observable.interval(1 second, scheduler)
val sub = o.subscribe(observer)
val sub = o.subscribe(Observer(observer))

verify(observer, never).onNext(0L)
verify(observer, never).onCompleted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ package rx.lang.scala.observables

import scala.collection.JavaConverters._
import rx.lang.scala.ImplicitFunctionConversions._
import rx.lang.scala.JavaWrapper

/**
* An Observable that provides blocking operators.
*
* You can obtain a BlockingObservable from an Observable using [[Observable.toBlockingObservable]]
*/
// constructor is private because users should use Observable.toBlockingObservable
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T])
extends AnyVal
{
class BlockingObservable[+T] private[scala] (val asJava: rx.observables.BlockingObservable[_ <: T]) extends JavaWrapper[rx.observables.BlockingObservable[_ <: T]] {

/**
* Invoke a method on each item emitted by the {@link Observable}; block until the Observable
Expand Down
Loading