From 7039d62b46075559130be3221b66677c8aa635c9 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 20 May 2014 10:47:14 -0700 Subject: [PATCH] Remove Ambiguous Subscribe Overloads with Scheduler - Fixes https://github.com/Netflix/RxJava/issues/1116 - These should never have been added, the subscribeOn operator already provides this functionality --- .../main/scala/rx/lang/scala/Observable.scala | 66 ------------ rxjava-core/src/main/java/rx/Observable.java | 102 ------------------ .../AbstractSchedulerConcurrencyTests.java | 4 +- 3 files changed, 2 insertions(+), 170 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d144d1884b..c16f9d5af7 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -118,17 +118,6 @@ trait Observable[+T] asJavaObservable.subscribe() } - /** - * $subscribeObserverMain - * - * @param observer $subscribeObserverParamObserver - * @param scheduler $subscribeObserverParamScheduler - * @return $subscribeAllReturn - */ - def subscribe(observer: Observer[T], scheduler: Scheduler): Subscription = { - asJavaObservable.subscribe(observer.asJavaObserver, scheduler) - } - /** * $subscribeObserverMain * @@ -147,19 +136,6 @@ trait Observable[+T] */ def apply(observer: Observer[T]): Subscription = subscribe(observer) - /** - * $subscribeSubscriberMain - * - * @param subscriber $subscribeSubscriberParamObserver - * @param scheduler $subscribeSubscriberParamScheduler - * @return $subscribeAllReturn - */ - def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = { - // Add the casting to avoid compile error "ambiguous reference to overloaded definition" - val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] - thisJava.subscribe(subscriber.asJavaSubscriber, scheduler) - } - /** * $subscribeSubscriberMain * @@ -220,48 +196,6 @@ trait Observable[+T] ) } - /** - * $subscribeCallbacksMainWithNotifications - * - * @param onNext $subscribeCallbacksParamOnNext - * @param onError $subscribeCallbacksParamOnError - * @param onCompleted $subscribeCallbacksParamOnComplete - * @param scheduler $subscribeCallbacksParamScheduler - * @return $subscribeAllReturn - */ - def subscribe(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit, scheduler: Scheduler): Subscription = { - asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext), - scalaFunction1ProducingUnitToAction1(onError), - scalaFunction0ProducingUnitToAction0(onCompleted), - scheduler) - } - - /** - * $subscribeCallbacksMainWithNotifications - * - * @param onNext $subscribeCallbacksParamOnNext - * @param onError $subscribeCallbacksParamOnError - * @param scheduler $subscribeCallbacksParamScheduler - * @return $subscribeAllReturn - */ - def subscribe(onNext: T => Unit, onError: Throwable => Unit, scheduler: Scheduler): Subscription = { - asJavaObservable.subscribe( - scalaFunction1ProducingUnitToAction1(onNext), - scalaFunction1ProducingUnitToAction1(onError), - scheduler) - } - - /** - * $subscribeCallbacksMainNoNotifications - * - * @param onNext $subscribeCallbacksParamOnNext - * @param scheduler $subscribeCallbacksParamScheduler - * @return $subscribeAllReturn - */ - def subscribe(onNext: T => Unit, scheduler: Scheduler): Subscription = { - asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext), scheduler) - } - /** * Returns a pair of a start function and an [[rx.lang.scala.Observable]] that upon calling the start function causes the source Observable to * push results into the specified subject. diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c0e9463ebe..b778d033c2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -6026,76 +6026,6 @@ public final void onNext(T args) { }); } - /** - * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and - * notifications from the Observable. - * - * @param onNext - * FIXME FIXME FIXME - * @param onError - * FIXME FIXME FIXME - * @param onComplete - * FIXME FIXME FIXME - * @param scheduler - * FIXME FIXME FIXME - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before - * the Observable has finished sending them - * @see RxJava Wiki: onNext, onCompleted, and onError - */ - public final Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { - return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); - } - - /** - * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and - * notifications from the Observable. - * - * @param onNext - * FIXME FIXME FIXME - * @param onError - * FIXME FIXME FIXME - * @param scheduler - * FIXME FIXME FIXME - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before - * the Observable has finished sending them - * @see RxJava Wiki: onNext, onCompleted, and onError - */ - public final Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { - return subscribeOn(scheduler).subscribe(onNext, onError); - } - - /** - * An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and - * notifications from the Observable. - * - * @param onNext - * FIXME FIXME FIXME - * @param scheduler - * FIXME FIXME FIXME - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before - * the Observable has finished sending them - * @see RxJava Wiki: onNext, onCompleted, and onError - */ - public final Subscription subscribe(final Action1 onNext, Scheduler scheduler) { - return subscribeOn(scheduler).subscribe(onNext); - } - - /** - * An {@link Observer} must subscribe to an Observable in order to receive items and notifications from the - * Observable. - * - * @param observer - * FIXME FIXME FIXME - * @param scheduler - * FIXME FIXME FIXME - * @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before - * the Observable has finished sending them - * @see RxJava Wiki: onNext, onCompleted, and onError - */ - public final Subscription subscribe(final Observer observer, Scheduler scheduler) { - return subscribeOn(scheduler).subscribe(observer); - } - /** * An {@link Observer} must subscribe to an Observable in order to receive items and notifications from the * Observable. @@ -6241,38 +6171,6 @@ public void call() { } } - /** - * A {@link Subscriber} must call an Observable's {@code subscribe} method in order to receive items and - * notifications from the Observable. - *

- * A typical implementation of {@code subscribe} does the following: - *

    - *
  1. It stores a reference to the Subscriber in a collection object, such as a {@code List} object.
  2. - *
  3. It returns a reference to the {@link Subscription} interface. This enables Observers to unsubscribe, - * that is, to stop receiving items and notifications before the Observable stops sending them, which also - * invokes the Observer's {@link Observer#onCompleted onCompleted} method.
  4. - *

- * An {@code Observable} instance is responsible for accepting all subscriptions and notifying all - * Subscribers. Unless the documentation for a particular {@code Observable} implementation indicates - * otherwise, Subscribers should make no assumptions about the order in which multiple Subscribers will - * receive their notifications. - *

- * For more information see the - * RxJava Wiki - * - * @param observer - * the {@link Subscriber} - * @param scheduler - * the {@link Scheduler} on which Subscribers subscribe to the Observable - * @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can - * unsubscribe from the Observable - * @throws IllegalArgumentException - * if an argument to {@code subscribe()} is {@code null} - */ - public final Subscription subscribe(Subscriber observer, Scheduler scheduler) { - return subscribeOn(scheduler).subscribe(observer); - } - /** * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}. *

diff --git a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 332b9c257b..fd81a2d5fb 100644 --- a/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/rxjava-core/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -371,7 +371,7 @@ public void call(Integer t) { final CountDownLatch latch = new CountDownLatch(5); final CountDownLatch first = new CountDownLatch(1); - o1.subscribe(new Action1() { + o1.subscribeOn(scheduler).subscribe(new Action1() { @Override public void call(Integer t) { @@ -388,7 +388,7 @@ public void call(Integer t) { count.incrementAndGet(); latch.countDown(); } - }, scheduler); + }); // assert we are async assertEquals(0, count.get());