Skip to content

Commit

Permalink
Remove callSubscribeIfNecessary, avoid duplicate subscribes
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacSweers committed Nov 27, 2017
1 parent 44b362f commit ba8bd87
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.CompletableObserver;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,30 +42,26 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
@Override public void onSubscribe(final Disposable d) {
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposableHelper.dispose(mainDisposable);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.onError(e);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -79,16 +74,6 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
AutoDisposableHelper.dispose(mainDisposable);
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
}

@Override public void onComplete() {
if (!isDisposed()) {
AutoDisposableHelper.dispose(lifecycleDisposable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,30 +42,26 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
@Override public void onSubscribe(final Disposable d) {
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposableHelper.dispose(mainDisposable);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.onError(e);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -79,16 +74,6 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
AutoDisposableHelper.dispose(mainDisposable);
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
}

@Override public void onSuccess(T value) {
if (!isDisposed()) {
AutoDisposableHelper.dispose(lifecycleDisposable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.Maybe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -45,30 +44,26 @@ final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDi
@Override public void onSubscribe(final Disposable d) {
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposableHelper.dispose(mainDisposable);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.onError(e);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -81,16 +76,6 @@ final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDi
AutoDisposableHelper.dispose(mainDisposable);
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
}

@Override public void onNext(T value) {
if (!isDisposed()) {
if (HalfSerializer.onNext(delegate, value, this, error)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.Maybe;
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,30 +42,26 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs
@Override public void onSubscribe(final Disposable d) {
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposableHelper.dispose(mainDisposable);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingSingleObserverImpl.this.onError(e);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -79,16 +74,6 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs
AutoDisposableHelper.dispose(mainDisposable);
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
}

@Override public void onSuccess(T value) {
if (!isDisposed()) {
AutoDisposableHelper.dispose(lifecycleDisposable);
Expand Down

0 comments on commit ba8bd87

Please sign in to comment.