From 8cf821abe664b2b0ef29c1a70c3355bec1a7b712 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 9 May 2014 17:18:13 +0800 Subject: [PATCH 1/6] Fix issue #1173 --- .../main/scala/rx/lang/scala/Subscriber.scala | 2 ++ .../scala/rx/lang/scala/SubscriberTests.scala | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index f19abebde1..38934742c9 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -34,6 +34,8 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] { private[scala] def apply[T](subscriber: rx.Subscriber[T]): Subscriber[T] = new Subscriber[T] { override val asJavaSubscriber = subscriber + override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber + override val asJavaSubscription: rx.Subscription = asJavaSubscriber override def onNext(value: T): Unit = asJavaSubscriber.onNext(value) override def onError(error: Throwable): Unit = asJavaSubscriber.onError(error) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala new file mode 100644 index 0000000000..f5af3dafca --- /dev/null +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala @@ -0,0 +1,32 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.lang.scala + +import org.junit.Test +import org.junit.Assert.assertNotNull +import org.scalatest.junit.JUnitSuite + +class SubscriberTests extends JUnitSuite { + + @Test def testIssue1173() { + // https://github.com/Netflix/RxJava/issues/1173 + val subscriber = Subscriber((n: Int) => println(n)) + assertNotNull(subscriber.asJavaObserver) + assertNotNull(subscriber.asJavaSubscription) + assertNotNull(subscriber.asJavaSubscriber) + } + +} From a20bed894a0162c628bf77ee1e42d0eb32a85fe9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 10 May 2014 15:54:30 +0800 Subject: [PATCH 2/6] Fix the bug that the inner Subscriber can not be unsubscribed and elimilate scaladoc warnings --- .../scala/rx/lang/scala/Notification.scala | 2 +- .../main/scala/rx/lang/scala/Observable.scala | 68 ++++++++++++++++--- .../subscriptions/CompositeSubscription.scala | 6 +- .../scala/rx/lang/scala/SubscriberTests.scala | 13 ++++ 4 files changed, 76 insertions(+), 13 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala index 84f1642dbc..c9e8cd4822 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Notification.scala @@ -137,7 +137,7 @@ object Notification { * @param notification * The [[rx.lang.scala.Notification]] to be deconstructed * @return - * The [[java.lang.Throwable]] value contained in this notification. + * The `java.lang.Throwable` value contained in this notification. */ def unapply[U](notification: Notification[U]): Option[Throwable] = notification match { case onError: OnError[U] => Some(onError.error) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 9885797dd4..a3924d3e56 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -48,6 +48,28 @@ import collection.JavaConversions._ * the observer * @define subscribeObserverParamScheduler * the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable + * + * @define subscribeSubscriberMain + * Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]]. + * + * A typical implementation of `subscribe` does the following: + * + * It stores a reference to the Observer in a collection object, such as a `List[T]` object. + * + * It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to + * unsubscribe, that is, to stop receiving items and notifications before the Observable stops + * sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method. + * + * An [[Observable]] instance is responsible for accepting all subscriptions + * and notifying all [[Subscriber]]s. Unless the documentation for a particular + * [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no + * assumptions about the order in which multiple [[Subscriber]]s will receive their notifications. + * + * @define subscribeSubscriberParamObserver + * the [[Subscriber]] + * @define subscribeSubscriberParamScheduler + * the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable + * * @define subscribeAllReturn * a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items * before the Observable has finished sending them @@ -125,6 +147,39 @@ trait Observable[+T] */ def apply(observer: Observer[T]): Subscription = subscribe(observer) + /** + * $subscribeSubscriberMain + * + * @param subscriber $subscribeSubscriberParamObserver + * @param scheduler $subscribeSubscriberParamScheduler + * @return $subscribeAllReturn + */ + def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = { + // Add the casting to avoid compile error "ambiguous reference to overloaded definition" + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + thisJava.subscribe(subscriber.asJavaSubscriber, scheduler) + } + + /** + * $subscribeSubscriberMain + * + * @param subscriber $subscribeSubscriberParamObserver + * @return $subscribeAllReturn + */ + def subscribe(subscriber: Subscriber[T]): Subscription = { + // Add the casting to avoid compile error "ambiguous reference to overloaded definition" + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + thisJava.subscribe(subscriber.asJavaSubscriber) + } + + /** + * $subscribeSubscriberMain + * + * @param subscriber $subscribeSubscriberParamObserver + * @return $subscribeAllReturn + */ + def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber) + /** * $subscribeCallbacksMainNoNotifications * @@ -2405,8 +2460,7 @@ trait Observable[+T] /** * Perform work in parallel by sharding an `Observable[T]` on a - * [[rx.lang.scala.concurrency.Schedulers.threadPoolForComputation computation]] - * [[rx.lang.scala.Scheduler]] and return an `Observable[R]` with the output. + * [[rx.lang.scala.schedulers.ComputationScheduler]] and return an `Observable[R]` with the output. * * @param f * a function that applies Observable operators to `Observable[T]` in parallel and returns an `Observable[R]` @@ -2636,12 +2690,10 @@ trait Observable[+T] * those emitted by the source Observable * @throws IndexOutOfBoundsException * if index is greater than or equal to the number of items emitted by the source - * Observable - * @throws IndexOutOfBoundsException - * if index is less than 0 + * Observable, or index is less than 0 * @see `Observable.elementAt` - * @deprecated("Use `elementAt`", "0.18.0") */ + @deprecated("Use `elementAt`", "0.18.0") def apply(index: Int): Observable[T] = elementAt(index) /** @@ -2656,9 +2708,7 @@ trait Observable[+T] * those emitted by the source Observable * @throws IndexOutOfBoundsException * if index is greater than or equal to the number of items emitted by the source - * Observable - * @throws IndexOutOfBoundsException - * if index is less than 0 + * Observable, or index is less than 0 */ def elementAt(index: Int): Observable[T] = { toScalaObservable[T](asJavaObservable.elementAt(index)) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala index abe2e721c4..dcd0ca5c39 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/CompositeSubscription.scala @@ -50,9 +50,9 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx. /** * Adds a subscription to the group, - * or unsubscribes immediately is the [[rx.subscriptions.CompositeSubscription]] is unsubscribed. + * or unsubscribes immediately is the [[rx.lang.scala.subscriptions.CompositeSubscription]] is unsubscribed. * @param subscription the subscription to be added. - * @return the [[rx.subscriptions.CompositeSubscription]] itself. + * @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself. */ def +=(subscription: Subscription): this.type = { asJavaSubscription.add(subscription.asJavaSubscription) @@ -62,7 +62,7 @@ class CompositeSubscription private[scala] (override val asJavaSubscription: rx. /** * Removes and unsubscribes a subscription to the group, * @param subscription the subscription be removed. - * @return the [[rx.subscriptions.CompositeSubscription]] itself. + * @return the [[rx.lang.scala.subscriptions.CompositeSubscription]] itself. */ def -=(subscription: Subscription): this.type = { asJavaSubscription.remove(subscription.asJavaSubscription) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala index f5af3dafca..d1c03227fe 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/SubscriberTests.scala @@ -17,6 +17,7 @@ package rx.lang.scala import org.junit.Test import org.junit.Assert.assertNotNull +import org.junit.Assert.assertTrue import org.scalatest.junit.JUnitSuite class SubscriberTests extends JUnitSuite { @@ -29,4 +30,16 @@ class SubscriberTests extends JUnitSuite { assertNotNull(subscriber.asJavaSubscriber) } + @Test def testUnsubscribeForSubscriber() { + var innerSubscriber: Subscriber[Int] = null + val o = Observable[Int](subscriber => { + Observable[Int](subscriber => { + innerSubscriber = subscriber + }).subscribe(subscriber) + }) + o.subscribe().unsubscribe() + // If we unsubscribe outside, the inner Subscriber should also be unsubscribed + assertTrue(innerSubscriber.isUnsubscribed) + } + } From 423395b5ef737b3a3e118e7106cf6537763e58be Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 11 May 2014 23:58:17 +0800 Subject: [PATCH 3/6] Update CompletenessTest.scala --- .../main/scala/rx/lang/scala/Observable.scala | 2 +- .../rx/lang/scala/CompletenessTest.scala | 37 +++++++++---------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index a3924d3e56..1fb22b90a4 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -342,7 +342,7 @@ trait Observable[+T] * * A well-behaved Observable does not interleave its invocations of the [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onCompleted onCompleted]], and [[rx.lang.scala.Observer.onError onError]] methods of * its [[rx.lang.scala.Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`. - * `synchronize` enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously. + * [[Observable.serialize serialize]] enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously. * * @return an Observable that is a chronologically well-behaved version of the source * Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 6d249289b0..a870652fa3 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -71,17 +71,15 @@ class CompletenessTest extends JUnitSuite { "all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)", "buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)", "buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)", - "contains(T)" -> "contains(Any)", + "contains(Any)" -> "contains(U)", "count()" -> "length", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", - "elementAt(Int)" -> "[use `.drop(index).first`]", - "elementAtOrDefault(Int, T)" -> "[use `.drop(index).firstOrElse(default)`]", + "elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)", "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "firstOrDefault(T)" -> "firstOrElse(=> U)", - "firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]", + "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", - "mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", @@ -95,6 +93,7 @@ class CompletenessTest extends JUnitSuite { "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", + "repeat()" -> "repeat()", "retry()" -> "retry()", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", @@ -113,8 +112,8 @@ class CompletenessTest extends JUnitSuite { "skipLast(Int)" -> "dropRight(Int)", "skipLast(Long, TimeUnit)" -> "dropRight(Duration)", "skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)", - "takeFirst()" -> "first", - "takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, + "subscribe()" -> "subscribe()", + "takeFirst(Func1[_ >: T, Boolean])" -> "[use `filter(condition).take(1)`]", "takeLast(Int)" -> "takeRight(Int)", "takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]", "timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])", @@ -126,7 +125,6 @@ class CompletenessTest extends JUnitSuite { "toList()" -> "toSeq", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", - "where(Func1[_ >: T, Boolean])" -> "filter(T => Boolean)", "window(Long, Long, TimeUnit)" -> "window(Duration, Duration)", "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", @@ -139,28 +137,22 @@ class CompletenessTest extends JUnitSuite { "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", "defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])", - "empty()" -> "apply(T*)", - "error(Throwable)" -> "apply(Throwable)", - "from(Array[T])" -> "apply(T*)", - "from(Iterable[_ <: T])" -> "apply(T*)", + "from(Array[T])" -> "[use apply(T*)]", + "from(Iterable[_ <: T])" -> "from(Iterable[T])", "from(Future[_ <: T])" -> fromFuture, "from(Future[_ <: T], Long, TimeUnit)" -> fromFuture, "from(Future[_ <: T], Scheduler)" -> fromFuture, - "just(T)" -> "apply(T*)", + "just(T)" -> "[use apply(T*)]", + "just(T, Scheduler)" -> "[use apply(T*).subscribeOn(scheduler)]", "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])", "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", "range(Int, Int)" -> "apply(Range)", - "repeat()" -> "repeat()", - "retry()" -> "retry()", - "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]", - "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]", "sum(Observable[Integer])" -> "sum(Numeric[U])", "sumDoubles(Observable[Double])" -> "sum(Numeric[U])", "sumFloats(Observable[Float])" -> "sum(Numeric[U])", "sumLongs(Observable[Long])" -> "sum(Numeric[U])", - "synchronize(Observable[T])" -> "synchronize", "switchDo(Observable[_ <: Observable[_ <: T]])" -> deprecated, "switchOnNext(Observable[_ <: Observable[_ <: T]])" -> "switch(<:<[Observable[T], Observable[Observable[U]]])", "zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]", @@ -174,7 +166,7 @@ class CompletenessTest extends JUnitSuite { "concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]" ).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map( // all 10 overloads of from: - "from(" + _ + ")" -> "apply(T*)" + "from(" + _ + ")" -> "[use apply(T*)]" ).toMap ++ (3 to 9).map(i => { // zip3-9: val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") @@ -216,6 +208,8 @@ class CompletenessTest extends JUnitSuite { // TODO how can we filter out instance methods which were put into companion because // of extends AnyVal in a way which does not depend on implementation-chosen name '$extension'? .filter(! _.contains("$extension")) + // `access$000` is public. How to distinguish it from others without hard-code? + .filter(! _.contains("access$000")) } // also applicable for Java types @@ -373,7 +367,10 @@ class CompletenessTest extends JUnitSuite { def escape(s: String) = s.replaceAllLiterally("[", "<").replaceAllLiterally("]", ">") println(""" -## Comparison of Scala Observable and Java Observable +--- +layout: comparison +title: Comparison of Scala Observable and Java Observable +--- Note: * This table contains both static methods and instance methods. From 0f191205830b9176c81ca66cd8d2a9518b096c4e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 13 May 2014 22:27:42 +0800 Subject: [PATCH 4/6] Update more mappings to CompletenessTest --- .../scala/rx/lang/scala/CompletenessTest.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index a870652fa3..4de06f175a 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -79,6 +79,7 @@ class CompletenessTest extends JUnitSuite { "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", + "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", @@ -133,22 +134,24 @@ class CompletenessTest extends JUnitSuite { "averageDoubles(Observable[Double])" -> averageProblem, "averageFloats(Observable[Float])" -> averageProblem, "averageLongs(Observable[Long])" -> averageProblem, - "create(OnSubscribeFunc[T])" -> "apply(Observer[T] => Subscription)", + "create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)", + "create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)", "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", "defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])", - "from(Array[T])" -> "[use apply(T*)]", + "from(Array[T])" -> "[use `items(T*)`]", "from(Iterable[_ <: T])" -> "from(Iterable[T])", "from(Future[_ <: T])" -> fromFuture, "from(Future[_ <: T], Long, TimeUnit)" -> fromFuture, "from(Future[_ <: T], Scheduler)" -> fromFuture, - "just(T)" -> "[use apply(T*)]", - "just(T, Scheduler)" -> "[use apply(T*).subscribeOn(scheduler)]", + "just(T)" -> "[use `items(T*)`]", + "just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]", "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])", "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", - "range(Int, Int)" -> "apply(Range)", + "range(Int, Int)" -> "[use `(... until ...).toObservable`]", + "range(Int, Int, Scheduler)" -> "[use `from((... until ...), scheduler)`]", "sum(Observable[Integer])" -> "sum(Numeric[U])", "sumDoubles(Observable[Double])" -> "sum(Numeric[U])", "sumFloats(Observable[Float])" -> "sum(Numeric[U])", @@ -166,7 +169,7 @@ class CompletenessTest extends JUnitSuite { "concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]" ).drop(1).toMap ++ List.iterate("T", 10)(s => s + ", T").map( // all 10 overloads of from: - "from(" + _ + ")" -> "[use apply(T*)]" + "from(" + _ + ")" -> "[use `items(T*)`]" ).toMap ++ (3 to 9).map(i => { // zip3-9: val obsArgs = (1 to i).map(j => s"Observable[_ <: T$j], ").mkString("") From 44efb27764a839e1b5f14e11e9e7573cbf912157 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 14 May 2014 22:07:23 +0800 Subject: [PATCH 5/6] Update range comments --- .../src/test/scala/rx/lang/scala/CompletenessTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 4de06f175a..6d59db0151 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -150,8 +150,8 @@ class CompletenessTest extends JUnitSuite { "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", - "range(Int, Int)" -> "[use `(... until ...).toObservable`]", - "range(Int, Int, Scheduler)" -> "[use `from((... until ...), scheduler)`]", + "range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]", + "range(Int, Int, Scheduler)" -> "[use `(start until (start + count)).toObservable.subscribeOn(scheduler)` instead of `range(start, count, scheduler)`]`]", "sum(Observable[Integer])" -> "sum(Numeric[U])", "sumDoubles(Observable[Double])" -> "sum(Numeric[U])", "sumFloats(Observable[Float])" -> "sum(Numeric[U])", From 039378f784dc42eef051ce7ebcf257879904b928 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 19 May 2014 10:25:17 +0800 Subject: [PATCH 6/6] Fix the initialization order --- .../src/main/scala/rx/lang/scala/Subscriber.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index 38934742c9..2b5d714076 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -3,16 +3,16 @@ package rx.lang.scala trait Subscriber[-T] extends Observer[T] with Subscription { self => - - private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber - private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber - + private [scala] val asJavaSubscriber: rx.Subscriber[_ >: T] = new rx.Subscriber[T] { def onNext(value: T): Unit = self.onNext(value) def onError(error: Throwable): Unit = self.onError(error) def onCompleted(): Unit = self.onCompleted() } - + + private [scala] override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubscriber + private [scala] override val asJavaSubscription: rx.Subscription = asJavaSubscriber + /** * Used to register an unsubscribe callback. */