Skip to content

Commit

Permalink
Merge pull request #1229 from benjchristensen/1116-ambiguous-overloads
Browse files Browse the repository at this point in the history
Remove Ambiguous Subscribe Overloads with Scheduler
  • Loading branch information
benjchristensen committed May 20, 2014
2 parents 5ecf69a + 7039d62 commit 3d77dc9
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,6 @@ trait Observable[+T]
asJavaObservable.subscribe()
}

/**
* $subscribeObserverMain
*
* @param observer $subscribeObserverParamObserver
* @param scheduler $subscribeObserverParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(observer: Observer[T], scheduler: Scheduler): Subscription = {
asJavaObservable.subscribe(observer.asJavaObserver, scheduler)
}

/**
* $subscribeObserverMain
*
Expand All @@ -147,19 +136,6 @@ trait Observable[+T]
*/
def apply(observer: Observer[T]): Subscription = subscribe(observer)

/**
* $subscribeSubscriberMain
*
* @param subscriber $subscribeSubscriberParamObserver
* @param scheduler $subscribeSubscriberParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(subscriber: Subscriber[T], scheduler: Scheduler): Subscription = {
// Add the casting to avoid compile error "ambiguous reference to overloaded definition"
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
thisJava.subscribe(subscriber.asJavaSubscriber, scheduler)
}

/**
* $subscribeSubscriberMain
*
Expand Down Expand Up @@ -220,48 +196,6 @@ trait Observable[+T]
)
}

/**
* $subscribeCallbacksMainWithNotifications
*
* @param onNext $subscribeCallbacksParamOnNext
* @param onError $subscribeCallbacksParamOnError
* @param onCompleted $subscribeCallbacksParamOnComplete
* @param scheduler $subscribeCallbacksParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit, scheduler: Scheduler): Subscription = {
asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext),
scalaFunction1ProducingUnitToAction1(onError),
scalaFunction0ProducingUnitToAction0(onCompleted),
scheduler)
}

/**
* $subscribeCallbacksMainWithNotifications
*
* @param onNext $subscribeCallbacksParamOnNext
* @param onError $subscribeCallbacksParamOnError
* @param scheduler $subscribeCallbacksParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(onNext: T => Unit, onError: Throwable => Unit, scheduler: Scheduler): Subscription = {
asJavaObservable.subscribe(
scalaFunction1ProducingUnitToAction1(onNext),
scalaFunction1ProducingUnitToAction1(onError),
scheduler)
}

/**
* $subscribeCallbacksMainNoNotifications
*
* @param onNext $subscribeCallbacksParamOnNext
* @param scheduler $subscribeCallbacksParamScheduler
* @return $subscribeAllReturn
*/
def subscribe(onNext: T => Unit, scheduler: Scheduler): Subscription = {
asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext), scheduler)
}

/**
* Returns a pair of a start function and an [[rx.lang.scala.Observable]] that upon calling the start function causes the source Observable to
* push results into the specified subject.
Expand Down
102 changes: 0 additions & 102 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6026,76 +6026,6 @@ public final void onNext(T args) {
});
}

/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and
* notifications from the Observable.
*
* @param onNext
* FIXME FIXME FIXME
* @param onError
* FIXME FIXME FIXME
* @param onComplete
* FIXME FIXME FIXME
* @param scheduler
* FIXME FIXME FIXME
* @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before
* the Observable has finished sending them
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable#wiki-onnext-oncompleted-and-onerror">RxJava Wiki: onNext, onCompleted, and onError</a>
*/
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and
* notifications from the Observable.
*
* @param onNext
* FIXME FIXME FIXME
* @param onError
* FIXME FIXME FIXME
* @param scheduler
* FIXME FIXME FIXME
* @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before
* the Observable has finished sending them
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable#wiki-onnext-oncompleted-and-onerror">RxJava Wiki: onNext, onCompleted, and onError</a>
*/
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

/**
* An {@link Observer} must call an Observable's {@code subscribe} method in order to receive items and
* notifications from the Observable.
*
* @param onNext
* FIXME FIXME FIXME
* @param scheduler
* FIXME FIXME FIXME
* @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before
* the Observable has finished sending them
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable#wiki-onnext-oncompleted-and-onerror">RxJava Wiki: onNext, onCompleted, and onError</a>
*/
public final Subscription subscribe(final Action1<? super T> onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}

/**
* An {@link Observer} must subscribe to an Observable in order to receive items and notifications from the
* Observable.
*
* @param observer
* FIXME FIXME FIXME
* @param scheduler
* FIXME FIXME FIXME
* @return a {@link Subscription} reference with which the {@link Observer} can stop receiving items before
* the Observable has finished sending them
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable#wiki-onnext-oncompleted-and-onerror">RxJava Wiki: onNext, onCompleted, and onError</a>
*/
public final Subscription subscribe(final Observer<? super T> observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}

/**
* An {@link Observer} must subscribe to an Observable in order to receive items and notifications from the
* Observable.
Expand Down Expand Up @@ -6241,38 +6171,6 @@ public void call() {
}
}

/**
* A {@link Subscriber} must call an Observable's {@code subscribe} method in order to receive items and
* notifications from the Observable.
* <p>
* A typical implementation of {@code subscribe} does the following:
* <ol>
* <li>It stores a reference to the Subscriber in a collection object, such as a {@code List<T>} object.</li>
* <li>It returns a reference to the {@link Subscription} interface. This enables Observers to unsubscribe,
* that is, to stop receiving items and notifications before the Observable stops sending them, which also
* invokes the Observer's {@link Observer#onCompleted onCompleted} method.</li>
* </ol><p>
* An {@code Observable<T>} instance is responsible for accepting all subscriptions and notifying all
* Subscribers. Unless the documentation for a particular {@code Observable<T>} implementation indicates
* otherwise, Subscribers should make no assumptions about the order in which multiple Subscribers will
* receive their notifications.
* <p>
* For more information see the
* <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
*
* @param observer
* the {@link Subscriber}
* @param scheduler
* the {@link Scheduler} on which Subscribers subscribe to the Observable
* @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can
* unsubscribe from the Observable
* @throws IllegalArgumentException
* if an argument to {@code subscribe()} is {@code null}
*/
public final Subscription subscribe(Subscriber<? super T> observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}

/**
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void call(Integer t) {
final CountDownLatch latch = new CountDownLatch(5);
final CountDownLatch first = new CountDownLatch(1);

o1.subscribe(new Action1<Integer>() {
o1.subscribeOn(scheduler).subscribe(new Action1<Integer>() {

@Override
public void call(Integer t) {
Expand All @@ -388,7 +388,7 @@ public void call(Integer t) {
count.incrementAndGet();
latch.countDown();
}
}, scheduler);
});

// assert we are async
assertEquals(0, count.get());
Expand Down

0 comments on commit 3d77dc9

Please sign in to comment.