Skip to content

Commit

Permalink
Call delegate.onSubscribe() first, remove callSubscribeIfNecessary
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacSweers committed Nov 27, 2017
1 parent 500f337 commit e7e1b87
Showing 1 changed file with 1 addition and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.uber.autodispose.observers.AutoDisposingSubscriber;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -51,26 +50,24 @@ final class AutoDisposingSubscriberImpl<T> extends AtomicInteger
@Override public void onSubscribe(final Subscription s) {
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(s);
AutoSubscriptionHelper.cancel(mainSubscription);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(s);
AutoDisposingSubscriberImpl.this.onError(e);
mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(s);
mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) {
SubscriptionHelper.deferredSetOnce(ref, requested, s);
Expand Down Expand Up @@ -101,16 +98,6 @@ final class AutoDisposingSubscriberImpl<T> extends AtomicInteger
AutoSubscriptionHelper.cancel(mainSubscription);
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Subscription s) {
// If we've never actually started the upstream subscription (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty subscription instance
// to abide by the Subscriber contract.
if (AutoSubscriptionHelper.setIfNotSet(mainSubscription, s)) {
delegate.onSubscribe(EmptySubscription.INSTANCE);
}
}

@Override public boolean isDisposed() {
return mainSubscription.get() == AutoSubscriptionHelper.CANCELLED;
}
Expand Down

0 comments on commit e7e1b87

Please sign in to comment.