diff --git a/android/autodispose-android-archcomponents/src/main/java/com/uber/autodispose/android/lifecycle/LifecycleEventsObservable.java b/android/autodispose-android-archcomponents/src/main/java/com/uber/autodispose/android/lifecycle/LifecycleEventsObservable.java index 3efcd4a8d..5d00eafe1 100644 --- a/android/autodispose-android-archcomponents/src/main/java/com/uber/autodispose/android/lifecycle/LifecycleEventsObservable.java +++ b/android/autodispose-android-archcomponents/src/main/java/com/uber/autodispose/android/lifecycle/LifecycleEventsObservable.java @@ -75,15 +75,18 @@ void backfillEvents() { } @Override protected void subscribeActual(Observer observer) { + ArchLifecycleObserver archObserver = + new ArchLifecycleObserver(lifecycle, observer, eventsObservable); + observer.onSubscribe(archObserver); if (!isMainThread()) { observer.onError( new IllegalStateException("Lifecycles can only be bound to on the main thread!")); return; } - ArchLifecycleObserver archObserver = - new ArchLifecycleObserver(lifecycle, observer, eventsObservable); - observer.onSubscribe(archObserver); lifecycle.addObserver(archObserver); + if (archObserver.isDisposed()) { + lifecycle.removeObserver(archObserver); + } } static final class ArchLifecycleObserver extends MainThreadDisposable diff --git a/android/autodispose-android/src/main/java/com/uber/autodispose/android/ViewAttachEventsObservable.java b/android/autodispose-android/src/main/java/com/uber/autodispose/android/ViewAttachEventsObservable.java index c95193664..17beef1fb 100644 --- a/android/autodispose-android/src/main/java/com/uber/autodispose/android/ViewAttachEventsObservable.java +++ b/android/autodispose-android/src/main/java/com/uber/autodispose/android/ViewAttachEventsObservable.java @@ -37,6 +37,8 @@ final class ViewAttachEventsObservable extends Observable { } @Override protected void subscribeActual(Observer observer) { + Listener listener = new Listener(view, observer); + observer.onSubscribe(listener); if (!isMainThread()) { observer.onError(new IllegalStateException("Views can only be bound to on the main thread!")); return; @@ -46,9 +48,10 @@ final class ViewAttachEventsObservable extends Observable { // Emit the last event, like a behavior subject observer.onNext(ViewLifecycleEvent.ATTACH); } - Listener listener = new Listener(view, observer); - observer.onSubscribe(listener); view.addOnAttachStateChangeListener(listener); + if (listener.isDisposed()) { + view.removeOnAttachStateChangeListener(listener); + } } static final class Listener extends MainThreadDisposable diff --git a/autodispose/src/main/java/com/uber/autodispose/AtomicThrowable.java b/autodispose/src/main/java/com/uber/autodispose/AtomicThrowable.java new file mode 100644 index 000000000..132472c1f --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/AtomicThrowable.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.autodispose; + +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** + * Atomic container for Throwables including combining and having a + * terminal state via ExceptionHelper. + *

+ * Watch out for the leaked AtomicReference methods! + */ +final class AtomicThrowable extends AtomicReference { + + private static final long serialVersionUID = 3949248817947090603L; + + /** + * Atomically adds a Throwable to this container (combining with a previous Throwable is + * necessary). + * + * @param t the throwable to add + * @return true if successful, false if the container has been terminated + */ + public boolean addThrowable(Throwable t) { + return ExceptionHelper.addThrowable(this, t); + } + + /** + * Atomically terminate the container and return the contents of the last + * non-terminal Throwable of it. + * + * @return the last Throwable + */ + @Nullable + public Throwable terminate() { + return ExceptionHelper.terminate(this); + } + + public boolean isTerminated() { + return get() == ExceptionHelper.TERMINATED; + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index a9074c605..bfec1253d 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -166,9 +166,9 @@ public interface ScopeHandler { return new LifecycleScopeProviderHandlerImpl(scope); } - private static class MaybeScopeHandlerImpl implements ScopeHandler { + private static final class MaybeScopeHandlerImpl implements ScopeHandler { - private final Maybe scope; + final Maybe scope; MaybeScopeHandlerImpl(Maybe scope) { this.scope = scope; @@ -196,9 +196,9 @@ public Function, ObservableSubscribeProxy> forObs } } - private static class ScopeProviderHandlerImpl implements ScopeHandler { + private static final class ScopeProviderHandlerImpl implements ScopeHandler { - private final ScopeProvider scope; + final ScopeProvider scope; ScopeProviderHandlerImpl(ScopeProvider scope) { this.scope = scope; @@ -226,9 +226,9 @@ public Function, ObservableSubscribeProxy> forObs } } - private static class LifecycleScopeProviderHandlerImpl implements ScopeHandler { + private static final class LifecycleScopeProviderHandlerImpl implements ScopeHandler { - private final LifecycleScopeProvider scope; + final LifecycleScopeProvider scope; LifecycleScopeProviderHandlerImpl(LifecycleScopeProvider scope) { this.scope = scope; diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java index a122dbd63..7d0b36dca 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java @@ -20,7 +20,6 @@ import io.reactivex.CompletableObserver; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; -import io.reactivex.disposables.Disposables; import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; @@ -41,27 +40,27 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.subscribeWith(new DisposableMaybeObserver() { - @Override public void onSuccess(Object o) { - callMainSubscribeIfNecessary(d); - AutoDisposingCompletableObserverImpl.this.dispose(); - } + DisposableMaybeObserver o = new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(mainDisposable); + } - @Override public void onError(Throwable e) { - callMainSubscribeIfNecessary(d); - AutoDisposingCompletableObserverImpl.this.onError(e); - } + @Override public void onError(Throwable e) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposingCompletableObserverImpl.this.onError(e); + } - @Override public void onComplete() { - callMainSubscribeIfNecessary(d); - // Noop - we're unbound now - } - }), - getClass())) { - if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { - delegate.onSubscribe(this); + @Override public void onComplete() { + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + // Noop - we're unbound now } + }; + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) { + delegate.onSubscribe(this); + lifecycle.subscribe(o); + AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass()); } } @@ -70,39 +69,22 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet } @Override public void dispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - AutoDisposableHelper.dispose(mainDisposable); - } - } - - private void lazyDispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); - } - } - - @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors - void callMainSubscribeIfNecessary(Disposable d) { - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) { - delegate.onSubscribe(Disposables.disposed()); - } + AutoDisposableHelper.dispose(lifecycleDisposable); + AutoDisposableHelper.dispose(mainDisposable); } @Override public void onComplete() { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onComplete(); } } @Override public void onError(Throwable e) { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onError(e); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java index f9bf50501..73010a7d5 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java @@ -20,7 +20,6 @@ import io.reactivex.Maybe; import io.reactivex.MaybeObserver; import io.reactivex.disposables.Disposable; -import io.reactivex.disposables.Disposables; import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; @@ -41,27 +40,27 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.subscribeWith(new DisposableMaybeObserver() { - @Override public void onSuccess(Object o) { - callMainSubscribeIfNecessary(d); - AutoDisposingMaybeObserverImpl.this.dispose(); - } + DisposableMaybeObserver o = new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(mainDisposable); + } - @Override public void onError(Throwable e) { - callMainSubscribeIfNecessary(d); - AutoDisposingMaybeObserverImpl.this.onError(e); - } + @Override public void onError(Throwable e) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposingMaybeObserverImpl.this.onError(e); + } - @Override public void onComplete() { - callMainSubscribeIfNecessary(d); - // Noop - we're unbound now - } - }), - getClass())) { - if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { - delegate.onSubscribe(this); + @Override public void onComplete() { + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + // Noop - we're unbound now } + }; + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) { + delegate.onSubscribe(this); + lifecycle.subscribe(o); + AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass()); } } @@ -70,46 +69,30 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser } @Override public void dispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - AutoDisposableHelper.dispose(mainDisposable); - } - } - - private void lazyDispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); - } - } - - @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors - void callMainSubscribeIfNecessary(Disposable d) { - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) { - delegate.onSubscribe(Disposables.disposed()); - } + AutoDisposableHelper.dispose(lifecycleDisposable); + AutoDisposableHelper.dispose(mainDisposable); } @Override public void onSuccess(T value) { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onSuccess(value); } } @Override public void onError(Throwable e) { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onError(e); } } @Override public void onComplete() { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onComplete(); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java index 82ddd5373..8aed379df 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java @@ -20,14 +20,15 @@ import io.reactivex.Maybe; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; -import io.reactivex.disposables.Disposables; import io.reactivex.observers.DisposableMaybeObserver; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -final class AutoDisposingObserverImpl implements AutoDisposingObserver { +final class AutoDisposingObserverImpl extends AtomicInteger implements AutoDisposingObserver { private final AtomicReference mainDisposable = new AtomicReference<>(); private final AtomicReference lifecycleDisposable = new AtomicReference<>(); + private final AtomicThrowable error = new AtomicThrowable(); private final Maybe lifecycle; private final Observer delegate; @@ -41,27 +42,27 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.subscribeWith(new DisposableMaybeObserver() { - @Override public void onSuccess(Object o) { - callMainSubscribeIfNecessary(d); - AutoDisposingObserverImpl.this.dispose(); - } + DisposableMaybeObserver o = new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(mainDisposable); + } - @Override public void onError(Throwable e) { - callMainSubscribeIfNecessary(d); - AutoDisposingObserverImpl.this.onError(e); - } + @Override public void onError(Throwable e) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposingObserverImpl.this.onError(e); + } - @Override public void onComplete() { - callMainSubscribeIfNecessary(d); - // Noop - we're unbound now - } - }), - getClass())) { - if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { - delegate.onSubscribe(this); + @Override public void onComplete() { + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + // Noop - we're unbound now } + }; + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) { + delegate.onSubscribe(this); + lifecycle.subscribe(o); + AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass()); } } @@ -70,46 +71,33 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { } @Override public void dispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - AutoDisposableHelper.dispose(mainDisposable); - } - } - - private void lazyDispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); - } - } - - @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors - void callMainSubscribeIfNecessary(Disposable d) { - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) { - delegate.onSubscribe(Disposables.disposed()); - } + AutoDisposableHelper.dispose(lifecycleDisposable); + AutoDisposableHelper.dispose(mainDisposable); } @Override public void onNext(T value) { if (!isDisposed()) { - delegate.onNext(value); + if (HalfSerializer.onNext(delegate, value, this, error)) { + // Terminal event occurred and was forwarded to the delegate, so clean up here + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); + } } } @Override public void onError(Throwable e) { if (!isDisposed()) { - lazyDispose(); - delegate.onError(e); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); + HalfSerializer.onError(delegate, e, this, error); } } @Override public void onComplete() { if (!isDisposed()) { - lazyDispose(); - delegate.onComplete(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); + HalfSerializer.onComplete(delegate, this, error); } } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java index 3aee9157b..324626a42 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java @@ -20,7 +20,6 @@ import io.reactivex.Maybe; import io.reactivex.SingleObserver; import io.reactivex.disposables.Disposable; -import io.reactivex.disposables.Disposables; import io.reactivex.observers.DisposableMaybeObserver; import java.util.concurrent.atomic.AtomicReference; @@ -41,27 +40,27 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.subscribeWith(new DisposableMaybeObserver() { - @Override public void onSuccess(Object o) { - callMainSubscribeIfNecessary(d); - AutoDisposingSingleObserverImpl.this.dispose(); - } + DisposableMaybeObserver o = new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(mainDisposable); + } - @Override public void onError(Throwable e) { - callMainSubscribeIfNecessary(d); - AutoDisposingSingleObserverImpl.this.onError(e); - } + @Override public void onError(Throwable e) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposingSingleObserverImpl.this.onError(e); + } - @Override public void onComplete() { - callMainSubscribeIfNecessary(d); - // Noop - we're unbound now - } - }), - getClass())) { - if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { - delegate.onSubscribe(this); + @Override public void onComplete() { + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + // Noop - we're unbound now } + }; + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) { + delegate.onSubscribe(this); + lifecycle.subscribe(o); + AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass()); } } @@ -70,39 +69,22 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs } @Override public void dispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - AutoDisposableHelper.dispose(mainDisposable); - } - } - - private void lazyDispose() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); - } - } - - @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors - void callMainSubscribeIfNecessary(Disposable d) { - // If we've never actually called the downstream onSubscribe (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty disposable instance - // to abide by the Observer contract. - if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) { - delegate.onSubscribe(Disposables.disposed()); - } + AutoDisposableHelper.dispose(lifecycleDisposable); + AutoDisposableHelper.dispose(mainDisposable); } @Override public void onSuccess(T value) { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onSuccess(value); } } @Override public void onError(Throwable e) { if (!isDisposed()) { - lazyDispose(); + mainDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposableHelper.dispose(lifecycleDisposable); delegate.onError(e); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java index 45647d581..bba5334da 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java @@ -19,16 +19,21 @@ import com.uber.autodispose.observers.AutoDisposingSubscriber; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.subscriptions.EmptySubscription; import io.reactivex.observers.DisposableMaybeObserver; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber { +final class AutoDisposingSubscriberImpl extends AtomicInteger + implements AutoDisposingSubscriber { private final AtomicReference mainSubscription = new AtomicReference<>(); private final AtomicReference lifecycleDisposable = new AtomicReference<>(); + private final AtomicThrowable error = new AtomicThrowable(); + private final AtomicReference ref = new AtomicReference<>(); + private final AtomicLong requested = new AtomicLong(); private final Maybe lifecycle; private final Subscriber delegate; @@ -42,26 +47,28 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber } @Override public void onSubscribe(final Subscription s) { - if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, - lifecycle.subscribeWith(new DisposableMaybeObserver() { - @Override public void onSuccess(Object o) { - callMainSubscribeIfNecessary(s); - AutoDisposingSubscriberImpl.this.dispose(); - } - - @Override public void onError(Throwable e) { - callMainSubscribeIfNecessary(s); - AutoDisposingSubscriberImpl.this.onError(e); - } - - @Override public void onComplete() { - callMainSubscribeIfNecessary(s); - // Noop - we're unbound now - } - }), - getClass())) { + DisposableMaybeObserver o = new DisposableMaybeObserver() { + @Override public void onSuccess(Object o) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoSubscriptionHelper.cancel(mainSubscription); + } + + @Override public void onError(Throwable e) { + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + AutoDisposingSubscriberImpl.this.onError(e); + } + + @Override public void onComplete() { + mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); + lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED); + // Noop - we're unbound now + } + }; + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) { + delegate.onSubscribe(this); + lifecycle.subscribe(o); if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) { - delegate.onSubscribe(this); + AutoSubscriptionHelper.deferredSetOnce(ref, requested, s); } } } @@ -75,11 +82,8 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber * * @param n the request amount, positive */ - @Override public void request(long n) { - Subscription s = mainSubscription.get(); - if (s != null) { - s.request(n); - } + @SuppressWarnings("NullAway") @Override public void request(long n) { + AutoSubscriptionHelper.deferredRequest(ref, requested, n); } /** @@ -88,27 +92,8 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber *

This method is thread-safe and can be exposed as a public API. */ @Override public void cancel() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - AutoSubscriptionHelper.cancel(mainSubscription); - } - } - - private void lazyCancel() { - synchronized (this) { - AutoDisposableHelper.dispose(lifecycleDisposable); - mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); - } - } - - @SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors - void callMainSubscribeIfNecessary(Subscription s) { - // If we've never actually started the upstream subscription (i.e. requested immediately in - // onSubscribe and had a terminal event), we need to still send an empty subscription instance - // to abide by the Subscriber contract. - if (AutoSubscriptionHelper.setIfNotSet(mainSubscription, s)) { - delegate.onSubscribe(EmptySubscription.INSTANCE); - } + AutoDisposableHelper.dispose(lifecycleDisposable); + AutoSubscriptionHelper.cancel(mainSubscription); } @Override public boolean isDisposed() { @@ -121,21 +106,27 @@ void callMainSubscribeIfNecessary(Subscription s) { @Override public void onNext(T value) { if (!isDisposed()) { - delegate.onNext(value); + if (HalfSerializer.onNext(delegate, value, this, error)) { + // Terminal event occurred and was forwarded to the delegate, so clean up here + mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); + AutoDisposableHelper.dispose(lifecycleDisposable); + } } } @Override public void onError(Throwable e) { if (!isDisposed()) { - lazyCancel(); - delegate.onError(e); + mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); + AutoDisposableHelper.dispose(lifecycleDisposable); + HalfSerializer.onError(delegate, e, this, error); } } @Override public void onComplete() { if (!isDisposed()) { - lazyCancel(); - delegate.onComplete(); + mainSubscription.lazySet(AutoSubscriptionHelper.CANCELLED); + AutoDisposableHelper.dispose(lifecycleDisposable); + HalfSerializer.onComplete(delegate, this, error); } } } diff --git a/autodispose/src/main/java/com/uber/autodispose/ExceptionHelper.java b/autodispose/src/main/java/com/uber/autodispose/ExceptionHelper.java new file mode 100644 index 000000000..394998402 --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/ExceptionHelper.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.autodispose; + +import io.reactivex.exceptions.CompositeException; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** + * Terminal atomics for Throwable containers. + */ +final class ExceptionHelper { + + /** Utility class. */ + private ExceptionHelper() { + throw new IllegalStateException("No instances!"); + } + + /** + * A singleton instance of a Throwable indicating a terminal state for exceptions, + * don't leak this. + */ + public static final Throwable TERMINATED = new Termination(); + + public static boolean addThrowable(AtomicReference field, Throwable exception) { + for (; ; ) { + Throwable current = field.get(); + + if (current == TERMINATED) { + return false; + } + + Throwable update; + if (current == null) { + update = exception; + } else { + update = new CompositeException(current, exception); + } + + if (field.compareAndSet(current, update)) { + return true; + } + } + } + + @Nullable + public static Throwable terminate(AtomicReference field) { + Throwable current = field.get(); + if (current != TERMINATED) { + current = field.getAndSet(TERMINATED); + } + return current; + } + + static final class Termination extends Throwable { + + Termination() { + super("No further exceptions"); + } + + @Override public synchronized Throwable fillInStackTrace() { + return this; + } + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/HalfSerializer.java b/autodispose/src/main/java/com/uber/autodispose/HalfSerializer.java new file mode 100644 index 000000000..f47fcd1a1 --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/HalfSerializer.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.uber.autodispose; + +import io.reactivex.Observer; +import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicInteger; +import org.reactivestreams.Subscriber; + +/** + * Utility methods to perform half-serialization: a form of serialization + * where onNext is guaranteed to be called from a single thread but + * onError or onComplete may be called from any threads. + *

+ * The onNext methods have been modified to return a boolean indicating whether or not the delegate + * observer was sent a terminal event. + */ +final class HalfSerializer { + /** Utility class. */ + private HalfSerializer() { + throw new IllegalStateException("No instances!"); + } + + /** + * Emits the given value if possible and terminates if there was an onComplete or onError + * while emitting, drops the value otherwise. + * + * @param the value type + * @param subscriber the target Subscriber to emit to + * @param value the value to emit + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + * @return true if a terminal event was emitted to {@code observer}, false if not + */ + public static boolean onNext(Subscriber subscriber, + T value, + AtomicInteger wip, + AtomicThrowable error) { + if (wip.get() == 0 && wip.compareAndSet(0, 1)) { + subscriber.onNext(value); + if (wip.decrementAndGet() != 0) { + Throwable ex = error.terminate(); + if (ex != null) { + subscriber.onError(ex); + } else { + subscriber.onComplete(); + } + return true; + } + } + return false; + } + + /** + * Emits the given exception if possible or adds it to the given error container to + * be emitted by a concurrent onNext if one is running. + * Undeliverable exceptions are sent to the RxJavaPlugins.onError. + * + * @param subscriber the target Subscriber to emit to + * @param ex the Throwable to emit + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + */ + public static void onError(Subscriber subscriber, + Throwable ex, + AtomicInteger wip, + AtomicThrowable error) { + if (error.addThrowable(ex)) { + if (wip.getAndIncrement() == 0) { + subscriber.onError(error.terminate()); + } + } else { + RxJavaPlugins.onError(ex); + } + } + + /** + * Emits an onComplete signal or an onError signal with the given error or indicates + * the concurrently running onNext should do that. + * + * @param subscriber the target Subscriber to emit to + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + */ + public static void onComplete(Subscriber subscriber, + AtomicInteger wip, + AtomicThrowable error) { + if (wip.getAndIncrement() == 0) { + Throwable ex = error.terminate(); + if (ex != null) { + subscriber.onError(ex); + } else { + subscriber.onComplete(); + } + } + } + + /** + * Emits the given value if possible and terminates if there was an onComplete or onError + * while emitting, drops the value otherwise. + * + * @param the value type + * @param observer the target Observer to emit to + * @param value the value to emit + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + * @return true if a terminal event was emitted to {@code observer}, false if not + */ + public static boolean onNext(Observer observer, + T value, + AtomicInteger wip, + AtomicThrowable error) { + if (wip.get() == 0 && wip.compareAndSet(0, 1)) { + observer.onNext(value); + if (wip.decrementAndGet() != 0) { + Throwable ex = error.terminate(); + if (ex != null) { + observer.onError(ex); + } else { + observer.onComplete(); + } + return true; + } + } + return false; + } + + /** + * Emits the given exception if possible or adds it to the given error container to + * be emitted by a concurrent onNext if one is running. + * Undeliverable exceptions are sent to the RxJavaPlugins.onError. + * + * @param observer the target Subscriber to emit to + * @param ex the Throwable to emit + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + */ + public static void onError(Observer observer, + Throwable ex, + AtomicInteger wip, + AtomicThrowable error) { + if (error.addThrowable(ex)) { + if (wip.getAndIncrement() == 0) { + observer.onError(error.terminate()); + } + } else { + RxJavaPlugins.onError(ex); + } + } + + /** + * Emits an onComplete signal or an onError signal with the given error or indicates + * the concurrently running onNext should do that. + * + * @param observer the target Subscriber to emit to + * @param wip the serialization work-in-progress counter/indicator + * @param error the holder of Throwables + */ + public static void onComplete(Observer observer, AtomicInteger wip, AtomicThrowable error) { + if (wip.getAndIncrement() == 0) { + Throwable ex = error.terminate(); + if (ex != null) { + observer.onError(ex); + } else { + observer.onComplete(); + } + } + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java index a9b766ae6..7dd24112b 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingCompletableObserver.java @@ -27,7 +27,7 @@ public interface AutoDisposingCompletableObserver extends CompletableObserver, Disposable { /** - * @return The delegate {@link CompletableObserver} that is used under the hood forintrospection + * @return The delegate {@link CompletableObserver} that is used under the hood for introspection * purposes. This will be updated once LambdaIntrospection is out of @Experimental in RxJava. */ @Experimental diff --git a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java index b8aeda459..1314bfa2a 100644 --- a/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java +++ b/autodispose/src/main/java/com/uber/autodispose/observers/AutoDisposingObserver.java @@ -27,7 +27,7 @@ public interface AutoDisposingObserver extends Observer, Disposable { /** - * @return The delegate {@link Observer} that is used under the hood for introspection purpose. + * @return The delegate {@link Observer} that is used under the hood for introspection purposes. * This will be updated once LambdaIntrospection is out of @Experimental in RxJava. */ @Experimental diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java index 89b8e1342..25286832a 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeCompletableObserverTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import static com.google.common.truth.Truth.assertThat; @@ -48,6 +49,8 @@ public class AutoDisposeCompletableObserverTest { } }; + @Rule public RxErrorsRule rule = new RxErrorsRule(); + @After public void resetPlugins() { AutoDisposePlugins.reset(); } @@ -354,4 +357,15 @@ public CompletableObserver apply(Completable source, CompletableObserver observe assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestObserver o = new TestObserver<>(); + CompletableSubject.create() + .to(AutoDispose.with(ScopeProvider.UNBOUND).forCompletable()) + .subscribe(o); + o.assertNoValues(); + o.assertNoErrors(); + + rule.assertNoErrors(); + } } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java index cbe0e2c10..1e56f1365 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeMaybeObserverTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import static com.google.common.truth.Truth.assertThat; @@ -47,6 +48,8 @@ public class AutoDisposeMaybeObserverTest { } }; + @Rule public RxErrorsRule rule = new RxErrorsRule(); + @After public void resetPlugins() { AutoDisposePlugins.reset(); } @@ -384,4 +387,15 @@ public class AutoDisposeMaybeObserverTest { assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestObserver o = new TestObserver<>(); + MaybeSubject.create() + .to(AutoDispose.with(ScopeProvider.UNBOUND).forMaybe()) + .subscribe(o); + o.assertNoValues(); + o.assertNoErrors(); + + rule.assertNoErrors(); + } } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java index 4b4c81022..9274e0e9d 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeObserverTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import static com.google.common.truth.Truth.assertThat; @@ -47,6 +48,8 @@ public class AutoDisposeObserverTest { } }; + @Rule public RxErrorsRule rule = new RxErrorsRule(); + @After public void resetPlugins() { AutoDisposePlugins.reset(); } @@ -323,4 +326,15 @@ public class AutoDisposeObserverTest { assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestObserver o = new TestObserver<>(); + PublishSubject.create() + .to(AutoDispose.with(ScopeProvider.UNBOUND).forObservable()) + .subscribe(o); + o.assertNoValues(); + o.assertNoErrors(); + + rule.assertNoErrors(); + } } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java index ea6260e06..2dc4a0ead 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSingleObserverTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import static com.google.common.truth.Truth.assertThat; @@ -48,6 +49,8 @@ public class AutoDisposeSingleObserverTest { } }; + @Rule public RxErrorsRule rule = new RxErrorsRule(); + @After public void resetPlugins() { AutoDisposePlugins.reset(); } @@ -354,4 +357,15 @@ public class AutoDisposeSingleObserverTest { assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestObserver o = new TestObserver<>(); + SingleSubject.create() + .to(AutoDispose.with(ScopeProvider.UNBOUND).forSingle()) + .subscribe(o); + o.assertNoValues(); + o.assertNoErrors(); + + rule.assertNoErrors(); + } } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java index 23d3f7669..f0349af28 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Subscriber; @@ -42,6 +43,8 @@ public class AutoDisposeSubscriberTest { + @Rule public RxErrorsRule rule = new RxErrorsRule(); + @After public void resetPlugins() { AutoDisposePlugins.reset(); } @@ -334,4 +337,15 @@ public class AutoDisposeSubscriberTest { assertThat(i.get()).isEqualTo(1); assertThat(lifecycle.hasObservers()).isFalse(); } + + @Test public void autoDispose_withScopeProviderCompleted_shouldNotReportDoubleSubscriptions() { + TestSubscriber o = new TestSubscriber<>(); + PublishProcessor.create() + .to(AutoDispose.with(ScopeProvider.UNBOUND).forFlowable()) + .subscribe(o); + o.assertNoValues(); + o.assertNoErrors(); + + rule.assertNoErrors(); + } } diff --git a/autodispose/src/test/java/com/uber/autodispose/RxErrorsRule.java b/autodispose/src/test/java/com/uber/autodispose/RxErrorsRule.java index 7ddffb566..aa85375b3 100644 --- a/autodispose/src/test/java/com/uber/autodispose/RxErrorsRule.java +++ b/autodispose/src/test/java/com/uber/autodispose/RxErrorsRule.java @@ -65,12 +65,13 @@ public CompositeException takeCompositeException() { } public boolean hasErrors() { - Throwable error; - try { - error = errors.pollFirst(0, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Throwable error = errors.peek(); return error != null; } + + public void assertNoErrors() { + if (hasErrors()) { + throw new AssertionError("Expected no errors but found " + getErrors()); + } + } }