diff --git a/src/main/java/io/reactivex/internal/util/EndConsumerHelper.java b/src/main/java/io/reactivex/internal/util/EndConsumerHelper.java new file mode 100644 index 0000000000..48b8032a09 --- /dev/null +++ b/src/main/java/io/reactivex/internal/util/EndConsumerHelper.java @@ -0,0 +1,150 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import java.util.concurrent.atomic.*; + +import org.reactivestreams.Subscription; + +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.ProtocolViolationException; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.plugins.RxJavaPlugins; + +/** + * Utility class to help report multiple subscriptions with the same + * consumer type instead of the internal "Disposable already set!" message + * that is practically reserved for internal operators and indicate bugs in them. + */ +public final class EndConsumerHelper { + + /** + * Utility class. + */ + private EndConsumerHelper() { + throw new IllegalStateException("No instances!"); + } + + /** + * Ensures that the upstream Disposable is null and returns true, otherwise + * disposes the next Disposable and if the upstream is not the shared + * disposed instance, reports a ProtocolViolationException due to + * multiple subscribe attempts. + * @param upstream the upstream current value + * @param next the Disposable to check for nullness and dispose if necessary + * @param observer the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Disposable. + * @return true if successful, false if the upstream was non null + */ + public static boolean validate(Disposable upstream, Disposable next, Class observer) { + ObjectHelper.requireNonNull(next, "next is null"); + if (upstream != null) { + next.dispose(); + if (upstream != DisposableHelper.DISPOSED) { + reportDoubleSubscription(observer); + } + return false; + } + return true; + } + + /** + * Atomically updates the target upstream AtomicReference from null to the non-null + * next Disposable, otherwise disposes next and reports a ProtocolViolationException + * if the AtomicReference doesn't contain the shared disposed indicator. + * @param upstream the target AtomicReference to update + * @param next the Disposable to set on it atomically + * @param observer the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Disposable. + * @return true if successful, false if the content of the AtomicReference was non null + */ + public static boolean setOnce(AtomicReference upstream, Disposable next, Class observer) { + ObjectHelper.requireNonNull(next, "next is null"); + if (!upstream.compareAndSet(null, next)) { + next.dispose(); + if (upstream.get() != DisposableHelper.DISPOSED) { + reportDoubleSubscription(observer); + } + return false; + } + return true; + } + + /** + * Ensures that the upstream Subscription is null and returns true, otherwise + * cancels the next Subscription and if the upstream is not the shared + * cancelled instance, reports a ProtocolViolationException due to + * multiple subscribe attempts. + * @param upstream the upstream current value + * @param next the Subscription to check for nullness and cancel if necessary + * @param subscriber the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Subscription. + * @return true if successful, false if the upstream was non null + */ + public static boolean validate(Subscription upstream, Subscription next, Class subscriber) { + ObjectHelper.requireNonNull(next, "next is null"); + if (upstream != null) { + next.cancel(); + if (upstream != SubscriptionHelper.CANCELLED) { + reportDoubleSubscription(subscriber); + } + return false; + } + return true; + } + + /** + * Atomically updates the target upstream AtomicReference from null to the non-null + * next Subscription, otherwise cancels next and reports a ProtocolViolationException + * if the AtomicReference doesn't contain the shared cancelled indicator. + * @param upstream the target AtomicReference to update + * @param next the Subscription to set on it atomically + * @param subscriber the class of the consumer to have a personalized + * error message if the upstream already contains a non-cancelled Subscription. + * @return true if successful, false if the content of the AtomicReference was non null + */ + public static boolean setOnce(AtomicReference upstream, Subscription next, Class subscriber) { + ObjectHelper.requireNonNull(next, "next is null"); + if (!upstream.compareAndSet(null, next)) { + next.cancel(); + if (upstream.get() != SubscriptionHelper.CANCELLED) { + reportDoubleSubscription(subscriber); + } + return false; + } + return true; + } + + /** + * Builds the error message with the consumer class. + * @param consumer the class of the consumer + * @return the error message string + */ + public static String composeMessage(String consumer) { + return "It is not allowed to subscribe with a(n) " + consumer + " multiple times. " + + "Please create a fresh instance of " + consumer + " and subscribe that to the target source instead."; + } + + /** + * Report a ProtocolViolationException with a personalized message referencing + * the simple type name of the consumer class and report it via + * RxJavaPlugins.onError. + * @param consumer the class of the consumer + */ + public static void reportDoubleSubscription(Class consumer) { + RxJavaPlugins.onError(new ProtocolViolationException(composeMessage(consumer.getName()))); + } +} diff --git a/src/main/java/io/reactivex/observers/DefaultObserver.java b/src/main/java/io/reactivex/observers/DefaultObserver.java index c0ef8475c3..27fcab8596 100644 --- a/src/main/java/io/reactivex/observers/DefaultObserver.java +++ b/src/main/java/io/reactivex/observers/DefaultObserver.java @@ -17,6 +17,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a @@ -30,7 +31,7 @@ * *

Like all other consumers, {@code DefaultObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -67,7 +68,7 @@ public abstract class DefaultObserver implements Observer { private Disposable s; @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.validate(this.s, s)) { + if (EndConsumerHelper.validate(this.s, s, getClass())) { this.s = s; onStart(); } diff --git a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java index e0d6902a76..4314112dc8 100644 --- a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java @@ -19,6 +19,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable. @@ -27,7 +28,7 @@ * *

Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onError(Throwable)} and * {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -55,7 +56,7 @@ public abstract class DisposableCompletableObserver implements CompletableObserv @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java index 9adcd89767..553f6f3c37 100644 --- a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -19,6 +19,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable. @@ -31,7 +32,7 @@ * *

Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and * {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -66,7 +67,7 @@ public abstract class DisposableMaybeObserver implements MaybeObserver, Di @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index 79fd4db318..a6807af037 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -19,6 +19,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable. @@ -30,7 +31,7 @@ * *

Like all other consumers, {@code DisposableObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -69,7 +70,7 @@ public abstract class DisposableObserver implements Observer, Disposable { @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java index e086f466c2..ad76237460 100644 --- a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -19,6 +19,7 @@ import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable. @@ -27,7 +28,7 @@ * *

Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)} * are not allowed to throw any unchecked exceptions. @@ -58,7 +59,7 @@ public abstract class DisposableSingleObserver implements SingleObserver, @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java index 58906dcb6c..5f72922205 100644 --- a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java @@ -20,6 +20,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources. @@ -44,7 +45,7 @@ * *

Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -92,7 +93,7 @@ public final void add(@NonNull Disposable resource) { @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java index 4a0287e01f..65c4411897 100644 --- a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java @@ -20,6 +20,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link MaybeObserver} that allows asynchronous cancellation of its subscription and associated resources. @@ -48,7 +49,7 @@ * *

Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -102,7 +103,7 @@ public final void add(@NonNull Disposable resource) { @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java index 834b73674a..6aad24e455 100644 --- a/src/main/java/io/reactivex/observers/ResourceObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceObserver.java @@ -20,6 +20,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link Observer} that allows asynchronous cancellation of its subscription and associated resources. @@ -41,7 +42,7 @@ * *

Like all other consumers, {@code ResourceObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -100,7 +101,7 @@ public final void add(@NonNull Disposable resource) { @Override public final void onSubscribe(Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java index 4c18a139c2..d2cb8b7ce7 100644 --- a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java @@ -20,6 +20,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription @@ -45,7 +46,7 @@ * *

Like all other consumers, {@code ResourceSingleObserver} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)} * are not allowed to throw any unchecked exceptions. @@ -95,7 +96,7 @@ public final void add(@NonNull Disposable resource) { @Override public final void onSubscribe(@NonNull Disposable s) { - if (DisposableHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java b/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java index 5a5dff485f..dbd27242d7 100644 --- a/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java @@ -17,6 +17,7 @@ import io.reactivex.FlowableSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * Abstract base implementation of a {@link org.reactivestreams.Subscriber Subscriber} with @@ -40,7 +41,7 @@ * *

Like all other consumers, {@code DefaultSubscriber} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -78,7 +79,7 @@ public abstract class DefaultSubscriber implements FlowableSubscriber { private Subscription s; @Override public final void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { + if (EndConsumerHelper.validate(this.s, s, getClass())) { this.s = s; onStart(); } diff --git a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java index cebac57baa..17c55976a7 100644 --- a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java @@ -20,6 +20,7 @@ import io.reactivex.FlowableSubscriber; import io.reactivex.disposables.Disposable; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract Subscriber that allows asynchronous, external cancellation by implementing Disposable. @@ -39,7 +40,7 @@ * *

Like all other consumers, {@code DisposableSubscriber} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -77,7 +78,7 @@ public abstract class DisposableSubscriber implements FlowableSubscriber, @Override public final void onSubscribe(Subscription s) { - if (SubscriptionHelper.setOnce(this.s, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { onStart(); } } diff --git a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java index 986c0d5ce2..930b3ab76b 100644 --- a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java @@ -22,6 +22,7 @@ import io.reactivex.internal.disposables.ListCompositeDisposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.EndConsumerHelper; /** * An abstract Subscriber that allows asynchronous cancellation of its @@ -52,7 +53,7 @@ * *

Like all other consumers, {@code ResourceSubscriber} can be subscribed only once. * Any subsequent attempt to subscribe it to a new source will yield an - * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * {@link IllegalStateException} with message {@code "It is not allowed to subscribe with a(n) multiple times."}. * *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. @@ -115,7 +116,11 @@ public final void add(Disposable resource) { @Override public final void onSubscribe(Subscription s) { - if (SubscriptionHelper.deferredSetOnce(this.s, missedRequested, s)) { + if (EndConsumerHelper.setOnce(this.s, s, getClass())) { + long r = missedRequested.getAndSet(0L); + if (r != 0L) { + s.request(r); + } onStart(); } } diff --git a/src/test/java/io/reactivex/internal/disposables/DisposableHelperTest.java b/src/test/java/io/reactivex/internal/disposables/DisposableHelperTest.java index 74af32378a..941e082101 100644 --- a/src/test/java/io/reactivex/internal/disposables/DisposableHelperTest.java +++ b/src/test/java/io/reactivex/internal/disposables/DisposableHelperTest.java @@ -118,4 +118,29 @@ public void dispose() { assertTrue(u.isDisposed()); } + + @Test + public void trySet() { + AtomicReference ref = new AtomicReference(); + + Disposable d1 = Disposables.empty(); + + assertTrue(DisposableHelper.trySet(ref, d1)); + + Disposable d2 = Disposables.empty(); + + assertFalse(DisposableHelper.trySet(ref, d2)); + + assertFalse(d1.isDisposed()); + + assertFalse(d2.isDisposed()); + + DisposableHelper.dispose(ref); + + Disposable d3 = Disposables.empty(); + + assertFalse(DisposableHelper.trySet(ref, d3)); + + assertTrue(d3.isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/util/EndConsumerHelperTest.java b/src/test/java/io/reactivex/internal/util/EndConsumerHelperTest.java new file mode 100644 index 0000000000..fa07598d30 --- /dev/null +++ b/src/test/java/io/reactivex/internal/util/EndConsumerHelperTest.java @@ -0,0 +1,493 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.util; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.junit.*; +import org.reactivestreams.Subscriber; + +import io.reactivex.*; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.ProtocolViolationException; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.subscriptions.*; +import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subscribers.*; + +public class EndConsumerHelperTest { + + List errors; + + @Before + public void before() { + errors = TestHelper.trackPluginErrors(); + } + + @After + public void after() { + RxJavaPlugins.reset(); + } + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(EndConsumerHelper.class); + } + + @Test + public void checkDoubleDefaultSubscriber() { + Subscriber consumer = new DefaultSubscriber() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + BooleanSubscription sub1 = new BooleanSubscription(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isCancelled()); + + BooleanSubscription sub2 = new BooleanSubscription(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isCancelled()); + + assertTrue(sub2.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + static final class EndDefaultSubscriber extends DefaultSubscriber { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + } + + @Test + public void checkDoubleDefaultSubscriberNonAnonymous() { + Subscriber consumer = new EndDefaultSubscriber(); + + BooleanSubscription sub1 = new BooleanSubscription(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isCancelled()); + + BooleanSubscription sub2 = new BooleanSubscription(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isCancelled()); + + assertTrue(sub2.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + + // with this consumer, the class name should be predictable + assertEquals(EndConsumerHelper.composeMessage("io.reactivex.internal.util.EndConsumerHelperTest$EndDefaultSubscriber"), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDisposableSubscriber() { + Subscriber consumer = new DisposableSubscriber() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + BooleanSubscription sub1 = new BooleanSubscription(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isCancelled()); + + BooleanSubscription sub2 = new BooleanSubscription(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isCancelled()); + + assertTrue(sub2.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleResourceSubscriber() { + Subscriber consumer = new ResourceSubscriber() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + BooleanSubscription sub1 = new BooleanSubscription(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isCancelled()); + + BooleanSubscription sub2 = new BooleanSubscription(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isCancelled()); + + assertTrue(sub2.isCancelled()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDefaultObserver() { + Observer consumer = new DefaultObserver() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDisposableObserver() { + Observer consumer = new DisposableObserver() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleResourceObserver() { + Observer consumer = new ResourceObserver() { + @Override + public void onNext(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDisposableSingleObserver() { + SingleObserver consumer = new DisposableSingleObserver() { + @Override + public void onSuccess(Integer t) { + } + @Override + public void onError(Throwable t) { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleResourceSingleObserver() { + SingleObserver consumer = new ResourceSingleObserver() { + @Override + public void onSuccess(Integer t) { + } + @Override + public void onError(Throwable t) { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDisposableMaybeObserver() { + MaybeObserver consumer = new DisposableMaybeObserver() { + @Override + public void onSuccess(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleResourceMaybeObserver() { + MaybeObserver consumer = new ResourceMaybeObserver() { + @Override + public void onSuccess(Integer t) { + } + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleDisposableCompletableObserver() { + CompletableObserver consumer = new DisposableCompletableObserver() { + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void checkDoubleResourceCompletableObserver() { + CompletableObserver consumer = new ResourceCompletableObserver() { + @Override + public void onError(Throwable t) { + } + @Override + public void onComplete() { + } + }; + + Disposable sub1 = Disposables.empty(); + + consumer.onSubscribe(sub1); + + assertFalse(sub1.isDisposed()); + + Disposable sub2 = Disposables.empty(); + + consumer.onSubscribe(sub2); + + assertFalse(sub1.isDisposed()); + + assertTrue(sub2.isDisposed()); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage()); + assertEquals(errors.toString(), 1, errors.size()); + } + + @Test + public void validateDisposable() { + Disposable d1 = Disposables.empty(); + + assertFalse(EndConsumerHelper.validate(DisposableHelper.DISPOSED, d1, getClass())); + + assertTrue(d1.isDisposed()); + + assertTrue(errors.toString(), errors.isEmpty()); + } + + @Test + public void validateSubscription() { + BooleanSubscription d1 = new BooleanSubscription(); + + assertFalse(EndConsumerHelper.validate(SubscriptionHelper.CANCELLED, d1, getClass())); + + assertTrue(d1.isCancelled()); + + assertTrue(errors.toString(), errors.isEmpty()); + } +} diff --git a/src/test/java/io/reactivex/observers/DisposableCompletableObserverTest.java b/src/test/java/io/reactivex/observers/DisposableCompletableObserverTest.java index a1cf09afad..035b9ef6c6 100644 --- a/src/test/java/io/reactivex/observers/DisposableCompletableObserverTest.java +++ b/src/test/java/io/reactivex/observers/DisposableCompletableObserverTest.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -88,7 +89,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/DisposableMaybeObserverTest.java b/src/test/java/io/reactivex/observers/DisposableMaybeObserverTest.java index b8a7f3d44e..547d8314bc 100644 --- a/src/test/java/io/reactivex/observers/DisposableMaybeObserverTest.java +++ b/src/test/java/io/reactivex/observers/DisposableMaybeObserverTest.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -96,7 +97,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/DisposableObserverTest.java b/src/test/java/io/reactivex/observers/DisposableObserverTest.java index 98f82e0dda..cfefc99fea 100644 --- a/src/test/java/io/reactivex/observers/DisposableObserverTest.java +++ b/src/test/java/io/reactivex/observers/DisposableObserverTest.java @@ -22,6 +22,7 @@ import io.reactivex.Observable; import io.reactivex.TestHelper; import io.reactivex.disposables.*; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; public class DisposableObserverTest { @@ -94,7 +95,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/DisposableSingleObserverTest.java b/src/test/java/io/reactivex/observers/DisposableSingleObserverTest.java index 0193bc7d16..a1dfd2a299 100644 --- a/src/test/java/io/reactivex/observers/DisposableSingleObserverTest.java +++ b/src/test/java/io/reactivex/observers/DisposableSingleObserverTest.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -88,7 +89,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java index b0f1fff2eb..66669e5f2d 100644 --- a/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java +++ b/src/test/java/io/reactivex/observers/ResourceCompletableObserverTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; import static org.junit.Assert.assertEquals; @@ -180,7 +181,7 @@ public void startOnce() { assertEquals(1, rco.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(rco.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java index fa6dd85687..af466153ec 100644 --- a/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java +++ b/src/test/java/io/reactivex/observers/ResourceMaybeObserverTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; import static org.junit.Assert.assertEquals; @@ -231,7 +232,7 @@ public void startOnce() { assertEquals(1, rmo.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(rmo.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/ResourceObserverTest.java b/src/test/java/io/reactivex/observers/ResourceObserverTest.java index 97f76d9c82..76f50c7bf5 100644 --- a/src/test/java/io/reactivex/observers/ResourceObserverTest.java +++ b/src/test/java/io/reactivex/observers/ResourceObserverTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; import static org.junit.Assert.assertEquals; @@ -190,7 +191,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java index b496b7dd55..0c3a50de7c 100644 --- a/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java +++ b/src/test/java/io/reactivex/observers/ResourceSingleObserverTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; import static org.junit.Assert.assertEquals; @@ -183,7 +184,7 @@ public void startOnce() { assertEquals(1, rso.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Disposable already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(rso.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/subscribers/DefaultSubscriberTest.java b/src/test/java/io/reactivex/subscribers/DefaultSubscriberTest.java new file mode 100644 index 0000000000..99a23a5c53 --- /dev/null +++ b/src/test/java/io/reactivex/subscribers/DefaultSubscriberTest.java @@ -0,0 +1,64 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.subscribers; + +import static org.junit.Assert.assertEquals; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.Flowable; + +public class DefaultSubscriberTest { + + static final class RequestEarly extends DefaultSubscriber { + + final List events = new ArrayList(); + + RequestEarly() { + request(5); + } + + @Override + protected void onStart() { + } + + @Override + public void onNext(Integer t) { + events.add(t); + } + + @Override + public void onError(Throwable t) { + events.add(t); + } + + @Override + public void onComplete() { + events.add("Done"); + } + + } + + @Test + public void requestUpfront() { + RequestEarly sub = new RequestEarly(); + + Flowable.range(1, 10).subscribe(sub); + + assertEquals(Collections.emptyList(), sub.events); + } + +} diff --git a/src/test/java/io/reactivex/subscribers/DisposableSubscriberTest.java b/src/test/java/io/reactivex/subscribers/DisposableSubscriberTest.java index e4ce2b9916..8e49bdea3d 100644 --- a/src/test/java/io/reactivex/subscribers/DisposableSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/DisposableSubscriberTest.java @@ -21,6 +21,7 @@ import io.reactivex.*; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; public class DisposableSubscriberTest { @@ -93,7 +94,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Subscription already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } diff --git a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java index a4411e10c7..1623765865 100644 --- a/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/ResourceSubscriberTest.java @@ -23,6 +23,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.internal.util.EndConsumerHelper; import io.reactivex.plugins.RxJavaPlugins; public class ResourceSubscriberTest { @@ -171,7 +172,7 @@ public void startOnce() { assertEquals(1, tc.start); - TestHelper.assertError(error, 0, IllegalStateException.class, "Subscription already set!"); + TestHelper.assertError(error, 0, IllegalStateException.class, EndConsumerHelper.composeMessage(tc.getClass().getName())); } finally { RxJavaPlugins.reset(); } @@ -214,4 +215,42 @@ protected void onStart() { assertTrue(tc.errors.isEmpty()); assertEquals(1, tc.complete); } + + static final class RequestEarly extends ResourceSubscriber { + + final List events = new ArrayList(); + + RequestEarly() { + request(5); + } + + @Override + protected void onStart() { + } + + @Override + public void onNext(Integer t) { + events.add(t); + } + + @Override + public void onError(Throwable t) { + events.add(t); + } + + @Override + public void onComplete() { + events.add("Done"); + } + + } + + @Test + public void requestUpfront() { + RequestEarly sub = new RequestEarly(); + + Flowable.range(1, 10).subscribe(sub); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5), sub.events); + } }