diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposableHelper.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposableHelper.java index 67a3fc1ed..f64ead6b9 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposableHelper.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposableHelper.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except @@ -17,9 +17,7 @@ package com.uber.autodispose; import io.reactivex.disposables.Disposable; -import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.AtomicReference; -import javax.annotation.Nullable; /** * Utility methods for working with Disposables atomically. Copied from the RxJava implementation. @@ -30,51 +28,6 @@ enum AutoDisposableHelper implements Disposable { */ DISPOSED; - static boolean isDisposed(Disposable d) { - return d == DISPOSED; - } - - static boolean set(AtomicReference field, @Nullable Disposable d) { - for (; ; ) { - Disposable current = field.get(); - if (current == DISPOSED) { - if (d != null) { - d.dispose(); - } - return false; - } - if (field.compareAndSet(current, d)) { - if (current != null) { - current.dispose(); - } - return true; - } - } - } - - /** - * Atomically sets the field to the given non-null Disposable and returns true - * or returns false if the field is non-null. - * If the target field contains the common DISPOSED instance, the supplied disposable - * is disposed. If the field contains other non-null Disposable, an IllegalStateException - * is signalled to the RxJavaPlugins.onError hook. - * - * @param field the target field - * @param d the disposable to set, not null - * @return true if the operation succeeded, false - */ - static boolean setOnce(AtomicReference field, Disposable d) { - AutoDisposeUtil.checkNotNull(d, "d is null"); - if (!field.compareAndSet(null, d)) { - d.dispose(); - if (field.get() != DISPOSED) { - reportDisposableSet(); - } - return false; - } - return true; - } - /** * Atomically sets the field to the given non-null Disposable and returns true * or returns false if the field is non-null. @@ -88,30 +41,6 @@ static boolean setIfNotSet(AtomicReference field, Disposable d) { return field.compareAndSet(null, d); } - /** - * Atomically replaces the Disposable in the field with the given new Disposable - * but does not dispose the old one. - * - * @param field the target field to change - * @param d the new disposable, null allowed - * @return true if the operation succeeded, false if the target field contained - * the common DISPOSED instance and the given disposable (if not null) is disposed. - */ - static boolean replace(AtomicReference field, @Nullable Disposable d) { - for (; ; ) { - Disposable current = field.get(); - if (current == DISPOSED) { - if (d != null) { - d.dispose(); - } - return false; - } - if (field.compareAndSet(current, d)) { - return true; - } - } - } - /** * Atomically disposes the Disposable in the field if not already disposed. * @@ -133,35 +62,6 @@ static boolean dispose(AtomicReference field) { return false; } - /** - * Verifies that current is null, next is not null, otherwise signals errors - * to the RxJavaPlugins and returns false. - * - * @param current the current Disposable, expected to be null - * @param next the next Disposable, expected to be non-null - * @return true if the validation succeeded - */ - static boolean validate(@Nullable Disposable current, Disposable next) { - //noinspection ConstantConditions leftover from original RxJava implementation - if (next == null) { - RxJavaPlugins.onError(new NullPointerException("next is null")); - return false; - } - if (current != null) { - next.dispose(); - reportDisposableSet(); - return false; - } - return true; - } - - /** - * Reports that the disposable is already set to the RxJavaPlugins error handler. - */ - static void reportDisposableSet() { - RxJavaPlugins.onError(new IllegalStateException("Disposable already set!")); - } - @Override public void dispose() { // deliberately no-op } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeEndConsumerHelper.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeEndConsumerHelper.java new file mode 100644 index 000000000..bdd66870e --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeEndConsumerHelper.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.autodispose; + +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.ProtocolViolationException; +import io.reactivex.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import org.reactivestreams.Subscription; + +/** + * Utility class to help report multiple subscriptions with the same + * consumer type instead of the internal "Disposable already set!" message + * that is practically reserved for internal operators and indicate bugs in them. + * + * Copied from the RxJava implementation. + */ +final class AutoDisposeEndConsumerHelper { + + /** + * Utility class. + */ + private AutoDisposeEndConsumerHelper() { + throw new IllegalStateException("No instances!"); + } + + /** + * Atomically updates the target upstream AtomicReference from null to the non-null + * next Disposable, otherwise disposes next and reports a ProtocolViolationException + * if the AtomicReference doesn't contain the shared disposed indicator. + * + * @param upstream the target AtomicReference to update + * @param next the Disposable to set on it atomically + * @param observer the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Disposable. + * @return true if successful, false if the content of the AtomicReference was non null + */ + public static boolean setOnce(AtomicReference upstream, Disposable next, + Class observer) { + AutoDisposeUtil.checkNotNull(next, "next is null"); + if (!upstream.compareAndSet(null, next)) { + next.dispose(); + if (upstream.get() != AutoDisposableHelper.DISPOSED) { + reportDoubleSubscription(observer); + } + return false; + } + return true; + } + + /** + * Atomically updates the target upstream AtomicReference from null to the non-null + * next Subscription, otherwise cancels next and reports a ProtocolViolationException + * if the AtomicReference doesn't contain the shared cancelled indicator. + * + * @param upstream the target AtomicReference to update + * @param next the Subscription to set on it atomically + * @param subscriber the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Subscription. + * @return true if successful, false if the content of the AtomicReference was non null + */ + public static boolean setOnce(AtomicReference upstream, Subscription next, + Class subscriber) { + AutoDisposeUtil.checkNotNull(next, "next is null"); + if (!upstream.compareAndSet(null, next)) { + next.cancel(); + if (upstream.get() != AutoSubscriptionHelper.CANCELLED) { + reportDoubleSubscription(subscriber); + } + return false; + } + return true; + } + + /** + * Builds the error message with the consumer class. + * + * @param consumer the class of the consumer + * @return the error message string + */ + public static String composeMessage(String consumer) { + return "It is not allowed to subscribe with a(n) " + + consumer + + " multiple times. " + + "Please create a fresh instance of " + + consumer + + " and subscribe that to the target source instead."; + } + + /** + * Report a ProtocolViolationException with a personalized message referencing + * the simple type name of the consumer class and report it via + * RxJavaPlugins.onError. + * + * @param consumer the class of the consumer + */ + public static void reportDoubleSubscription(Class consumer) { + RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName()))); + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java index 6dbecc32f..45336c8f4 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingCompletableObserverImpl.java @@ -38,22 +38,22 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposableHelper.setOnce(lifecycleDisposable, + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { - @Override - public void accept(Object o, Throwable throwable) throws Exception { + @Override public void accept(Object o, Throwable throwable) throws Exception { callMainSubscribeIfNecessary(d); } - }).subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e1) throws Exception { - onError(e1); - } - }))) { - if (AutoDisposableHelper.setOnce(mainDisposable, d)) { + }) + .subscribe(new Consumer() { + @Override public void accept(Object o) throws Exception { + dispose(); + } + }, new Consumer() { + @Override public void accept(Throwable e1) throws Exception { + onError(e1); + } + }), getClass())) { + if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java index 2be073ff7..de1c2b4ae 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingMaybeObserverImpl.java @@ -38,24 +38,22 @@ final class AutoDisposingMaybeObserverImpl implements AutoDisposingMaybeObser } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposableHelper.setOnce(lifecycleDisposable, - lifecycle.doOnEvent(new BiConsumer() { - @Override - public void accept(Object o, Throwable throwable) throws Exception { - callMainSubscribeIfNecessary(d); - } - }).subscribe(new Consumer() { - @Override - public void accept(Object o) throws Exception { + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, + lifecycle.doOnEvent(new BiConsumer() { + @Override public void accept(Object o, Throwable throwable) throws Exception { + callMainSubscribeIfNecessary(d); + } + }) + .subscribe(new Consumer() { + @Override public void accept(Object o) throws Exception { dispose(); } }, new Consumer() { - @Override - public void accept(Throwable e1) throws Exception { + @Override public void accept(Throwable e1) throws Exception { onError(e1); } - }))) { - if (AutoDisposableHelper.setOnce(mainDisposable, d)) { + }), getClass())) { + if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java index 5ff7ef209..2cd1eb9eb 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingObserverImpl.java @@ -38,22 +38,22 @@ final class AutoDisposingObserverImpl implements AutoDisposingObserver { } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposableHelper.setOnce(lifecycleDisposable, + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { - @Override - public void accept(Object o, Throwable throwable) throws Exception { + @Override public void accept(Object o, Throwable throwable) throws Exception { callMainSubscribeIfNecessary(d); } - }).subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingObserverImpl.this.onError(e); - } - }))) { - if (AutoDisposableHelper.setOnce(mainDisposable, d)) { + }) + .subscribe(new Consumer() { + @Override public void accept(Object o) throws Exception { + dispose(); + } + }, new Consumer() { + @Override public void accept(Throwable e) throws Exception { + AutoDisposingObserverImpl.this.onError(e); + } + }), getClass())) { + if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java index d9b4b46f1..972861462 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSingleObserverImpl.java @@ -38,22 +38,22 @@ final class AutoDisposingSingleObserverImpl implements AutoDisposingSingleObs } @Override public void onSubscribe(final Disposable d) { - if (AutoDisposableHelper.setOnce(lifecycleDisposable, + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { - @Override - public void accept(Object o, Throwable throwable) throws Exception { + @Override public void accept(Object o, Throwable throwable) throws Exception { callMainSubscribeIfNecessary(d); } - }).subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingSingleObserverImpl.this.onError(e); - } - }))) { - if (AutoDisposableHelper.setOnce(mainDisposable, d)) { + }) + .subscribe(new Consumer() { + @Override public void accept(Object o) throws Exception { + dispose(); + } + }, new Consumer() { + @Override public void accept(Throwable e) throws Exception { + AutoDisposingSingleObserverImpl.this.onError(e); + } + }), getClass())) { + if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) { delegate.onSubscribe(this); } } diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java index 6ec3f3468..295b3dbf6 100755 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposingSubscriberImpl.java @@ -39,22 +39,22 @@ final class AutoDisposingSubscriberImpl implements AutoDisposingSubscriber } @Override public void onSubscribe(final Subscription s) { - if (AutoDisposableHelper.setOnce(lifecycleDisposable, + if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, lifecycle.doOnEvent(new BiConsumer() { - @Override - public void accept(Object o, Throwable throwable) throws Exception { + @Override public void accept(Object o, Throwable throwable) throws Exception { callMainSubscribeIfNecessary(s); } - }).subscribe(new Consumer() { - @Override public void accept(Object o) throws Exception { - dispose(); - } - }, new Consumer() { - @Override public void accept(Throwable e) throws Exception { - AutoDisposingSubscriberImpl.this.onError(e); - } - }))) { - if (AutoSubscriptionHelper.setOnce(mainSubscription, s)) { + }) + .subscribe(new Consumer() { + @Override public void accept(Object o) throws Exception { + dispose(); + } + }, new Consumer() { + @Override public void accept(Throwable e) throws Exception { + AutoDisposingSubscriberImpl.this.onError(e); + } + }), getClass())) { + if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) { delegate.onSubscribe(this); } } diff --git a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java index a1e5a1737..45655244e 100755 --- a/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java +++ b/autodispose/src/test/java/com/uber/autodispose/AutoDisposeSubscriberTest.java @@ -202,16 +202,14 @@ public class AutoDisposeSubscriberTest { @Test public void autoDispose_withProviderAndNoOpPlugin_withoutStarting_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override - public void accept(OutsideLifecycleException e) throws Exception { } + @Override public void accept(OutsideLifecycleException e) throws Exception { } }); BehaviorSubject lifecycle = BehaviorSubject.create(); TestSubscriber o = new TestSubscriber<>(); LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor source = PublishProcessor.create(); - source - .to(new FlowableScoper(provider)) - .subscribe(o); + source.to(new FlowableScoper(provider)) + .subscribe(o); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); @@ -221,8 +219,7 @@ public void accept(OutsideLifecycleException e) throws Exception { } @Test public void autoDispose_withProviderAndNoOpPlugin_afterEnding_shouldFailSilently() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override - public void accept(OutsideLifecycleException e) throws Exception { + @Override public void accept(OutsideLifecycleException e) throws Exception { // Noop } }); @@ -233,9 +230,8 @@ public void accept(OutsideLifecycleException e) throws Exception { TestSubscriber o = new TestSubscriber<>(); LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor source = PublishProcessor.create(); - source - .to(new FlowableScoper(provider)) - .subscribe(o); + source.to(new FlowableScoper(provider)) + .subscribe(o); assertThat(source.hasSubscribers()).isFalse(); assertThat(lifecycle.hasObservers()).isFalse(); @@ -245,8 +241,7 @@ public void accept(OutsideLifecycleException e) throws Exception { @Test public void autoDispose_withProviderAndPlugin_withoutStarting_shouldFailWithExp() { AutoDisposePlugins.setOutsideLifecycleHandler(new Consumer() { - @Override - public void accept(OutsideLifecycleException e) throws Exception { + @Override public void accept(OutsideLifecycleException e) throws Exception { // Wrap in an IllegalStateException so we can verify this is the exception we see on the // other side throw new IllegalStateException(e); @@ -256,16 +251,14 @@ public void accept(OutsideLifecycleException e) throws Exception { TestSubscriber o = new TestSubscriber<>(); LifecycleScopeProvider provider = TestUtil.makeLifecycleProvider(lifecycle); PublishProcessor source = PublishProcessor.create(); - source - .to(new FlowableScoper(provider)) - .subscribe(o); + source.to(new FlowableScoper(provider)) + .subscribe(o); o.assertNoValues(); o.assertError(new Predicate() { - @Override - public boolean test(Throwable throwable) throws Exception { + @Override public boolean test(Throwable throwable) throws Exception { return throwable instanceof IllegalStateException - && throwable.getCause() instanceof OutsideLifecycleException; + && throwable.getCause() instanceof OutsideLifecycleException; } }); }