Skip to content

Commit

Permalink
Merge pull request #1178 from zsxwing/issue1173
Browse files Browse the repository at this point in the history
Fix issue #1173
  • Loading branch information
benjchristensen committed May 19, 2014
2 parents d64b3a1 + 039378f commit 62266af
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object Notification {
* @param notification
* The [[rx.lang.scala.Notification]] to be deconstructed
* @return
* The [[java.lang.Throwable]] value contained in this notification.
* The `java.lang.Throwable` value contained in this notification.
*/
def unapply[U](notification: Notification[U]): Option[Throwable] = notification match {
case onError: OnError[U] => Some(onError.error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,28 @@ import collection.JavaConversions._
* the observer
* @define subscribeObserverParamScheduler
* the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable
*
* @define subscribeSubscriberMain
* Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]].
*
* A typical implementation of `subscribe` does the following:
*
* It stores a reference to the Observer in a collection object, such as a `List[T]` object.
*
* It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to
* unsubscribe, that is, to stop receiving items and notifications before the Observable stops
* sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
*
* An [[Observable]] instance is responsible for accepting all subscriptions
* and notifying all [[Subscriber]]s. Unless the documentation for a particular
* [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no
* assumptions about the order in which multiple [[Subscriber]]s will receive their notifications.
*
* @define subscribeSubscriberParamObserver
* the [[Subscriber]]
* @define subscribeSubscriberParamScheduler
* the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable
*
* @define subscribeAllReturn
* a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items
* before the Observable has finished sending them
Expand Down Expand Up @@ -125,6 +147,39 @@ trait Observable[+T]
*/
def apply(observer: Observer[T]): Subscription = subscribe(observer)

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @param scheduler $subscribeSubscriberParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = {
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
thisJava.subscribe(subscriber.asJavaSubscriber, scheduler)
}

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @return $subscribeAllReturn
*/
def subscribe(subscriber: Subscriber[T]): Subscription = {
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
thisJava.subscribe(subscriber.asJavaSubscriber)
}

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @return $subscribeAllReturn
*/
def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber)

/**
* $subscribeCallbacksMainNoNotifications
*
Expand Down Expand Up @@ -287,7 +342,7 @@ trait Observable[+T]
*
* A well-behaved Observable does not interleave its invocations of the [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onCompleted onCompleted]], and [[rx.lang.scala.Observer.onError onError]] methods of
* its [[rx.lang.scala.Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`.
* `synchronize` enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
* [[Observable.serialize serialize]] enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
*
* @return an Observable that is a chronologically well-behaved version of the source
* Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s
Expand Down Expand Up @@ -2405,8 +2460,7 @@ trait Observable[+T]

/**
* Perform work in parallel by sharding an `Observable[T]` on a
* [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation computation]]
* [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output.
* [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output.
*
* @param f
* a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]`
Expand Down Expand Up @@ -2636,12 +2690,10 @@ trait Observable[+T]
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
* Observable, or index is less than 0
* @see `Observable.elementAt`
* @deprecated("Use `elementAt`", "0.18.0")
*/
@deprecated("Use `elementAt`", "0.18.0")
def apply(index: Int): Observable[T] = elementAt(index)

/**
Expand All @@ -2656,9 +2708,7 @@ trait Observable[+T]
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
* Observable, or index is less than 0
*/
def elementAt(index: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.elementAt(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package rx.lang.scala
trait Subscriber[-T] extends Observer[T] with Subscription {

self =>

private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber


private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] {
def onNext(value: T): Unit = self.onNext(value)
def onError(error: Throwable): Unit = self.onError(error)
def onCompleted(): Unit = self.onCompleted()
}


private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber

/**
* Used to register an unsubscribe callback.
*/
Expand All @@ -34,6 +34,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {

private[scala] def apply[T](subscriber: rx.Subscriber[T]): Subscriber[T] = new Subscriber[T] {
override val asJavaSubscriber = subscriber
override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber
override val asJavaSubscription: rx.Subscription = asJavaSubscriber

override def onNext(value: T): Unit = asJavaSubscriber.onNext(value)
override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.

/**
* Adds a subscription to the group,
* or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed.
* or unsubscribes immediately is the [[rx.lang.scala.subscriptions.CompositeSubscription]] is unsubscribed.
* @param subscription the subscription to be added.
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
*/
def +=(subscription: Subscription): this.type = {
asJavaSubscription.add(subscription.asJavaSubscription)
Expand All @@ -62,7 +62,7 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx.
/**
* Removes and unsubscribes a subscription to the group,
* @param subscription the subscription be removed.
* @return the [[rx.subscriptions.CompositeSubscription]] itself.
* @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself.
*/
def -=(subscription: Subscription): this.type = {
asJavaSubscription.remove(subscription.asJavaSubscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,16 @@ class CompletenessTest extends JUnitSuite {
"all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)",
"buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)",
"buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)",
"contains(T)" -> "contains(Any)",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"elementAt(Int)" -> "[use `.drop(index).first`]",
"elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
"onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])",
Expand All @@ -95,6 +94,7 @@ class CompletenessTest extends JUnitSuite {
"publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"repeat()" -> "repeat()",
"retry()" -> "retry()",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
Expand All @@ -113,8 +113,8 @@ class CompletenessTest extends JUnitSuite {
"skipLast(Int)" -> "dropRight(Int)",
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
"takeFirst()" -> "first",
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"subscribe()" -> "subscribe()",
"takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]",
"takeLast(Int)" -> "takeRight(Int)",
"takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]",
"timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])",
Expand All @@ -126,7 +126,6 @@ class CompletenessTest extends JUnitSuite {
"toList()" -> "toSeq",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)",
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",

Expand All @@ -135,32 +134,28 @@ class CompletenessTest extends JUnitSuite {
"averageDoubles(Observable[Double])" -> averageProblem,
"averageFloats(Observable[Float])" -> averageProblem,
"averageLongs(Observable[Long])" -> averageProblem,
"create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)",
"create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)",
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])",
"empty()" -> "apply(T*)",
"error(Throwable)" -> "apply(Throwable)",
"from(Array[T])" -> "apply(T*)",
"from(Iterable[_ <: T])" -> "apply(T*)",
"from(Array[T])" -> "[use `items(T*)`]",
"from(Iterable[_ <: T])" -> "from(Iterable[T])",
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
"from(Future[_ <: T], Scheduler)" -> fromFuture,
"just(T)" -> "apply(T*)",
"just(T)" -> "[use `items(T*)`]",
"just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
"range(Int, Int)" -> "apply(Range)",
"repeat()" -> "repeat()",
"retry()" -> "retry()",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]",
"range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]",
"range(Int, Int, Scheduler)" -> "[use `(start until (start + count)).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]`]",
"sum(Observable[Integer])" -> "sum(Numeric[U])",
"sumDoubles(Observable[Double])" -> "sum(Numeric[U])",
"sumFloats(Observable[Float])" -> "sum(Numeric[U])",
"sumLongs(Observable[Long])" -> "sum(Numeric[U])",
"synchronize(Observable[T])" -> "synchronize",
"switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated,
"switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
Expand All @@ -174,7 +169,7 @@ class CompletenessTest extends JUnitSuite {
"concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]"
).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map(
// all 10 overloads of from:
"from(" + _ + ")" -> "apply(T*)"
"from(" + _ + ")" -> "[use `items(T*)`]"
).toMap ++ (3 to 9).map(i => {
// zip3-9:
val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("")
Expand Down Expand Up @@ -216,6 +211,8 @@ class CompletenessTest extends JUnitSuite {
// TODO how can we filter out instance methods which were put into companion because
// of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'?
.filter(! _.contains("$extension"))
// `access$000` is public. How to distinguish it from others without hard-code?
.filter(! _.contains("access$000"))
}

// also applicable for Java types
Expand Down Expand Up @@ -373,7 +370,10 @@ class CompletenessTest extends JUnitSuite {
def escape(s: String) = s.replaceAllLiterally("[", "&lt;").replaceAllLiterally("]", "&gt;")

println("""
## Comparison of Scala Observable and Java Observable
---
layout: comparison
title: Comparison of Scala Observable and Java Observable
---
Note:
* This table contains both static methods and instance methods.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.lang.scala

import org.junit.Test
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.scalatest.junit.JUnitSuite

class SubscriberTests extends JUnitSuite {

@Test def testIssue1173() {
// https://github.com/Netflix/RxJava/issues/1173
val subscriber = Subscriber((n: Int) => println(n))
assertNotNull(subscriber.asJavaObserver)
assertNotNull(subscriber.asJavaSubscription)
assertNotNull(subscriber.asJavaSubscriber)
}

@Test def testUnsubscribeForSubscriber() {
var innerSubscriber: Subscriber[Int] = null
val o = Observable[Int](subscriber => {
Observable[Int](subscriber => {
innerSubscriber = subscriber
}).subscribe(subscriber)
})
o.subscribe().unsubscribe()
// If we unsubscribe outside, the inner Subscriber should also be unsubscribed
assertTrue(innerSubscriber.isUnsubscribed)
}

}

0 comments on commit 62266af

Please sign in to comment.