From 67ef32c2c3b28cf3dd40e999bc836deca52fda20 Mon Sep 17 00:00:00 2001 From: Artem Zinnatullin Date: Fri, 29 Jan 2016 04:36:34 +0300 Subject: [PATCH] Add Single.onErrorResumeNext(Single) --- src/main/java/rx/Single.java | 31 +++++++++++++ ...gleOperatorOnErrorResumeNextViaSingle.java | 45 +++++++++++++++++++ src/test/java/rx/SingleTest.java | 42 +++++++++++++++-- 3 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index fb1fed35d7..96ac18a25a 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1411,6 +1411,37 @@ public final Single onErrorReturn(Func1 resumeFunctio return lift(new OperatorOnErrorReturn(resumeFunction)); } + /** + * Instructs a Single to pass control to another Single rather than invoking + * {@link Observer#onError(Throwable)} if it encounters an error. + *

+ * + *

+ * By default, when a Single encounters an error that prevents it from emitting the expected item to + * its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits + * without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this + * behavior. If you pass another Single ({@code resumeSingleInCaseOfError}) to an Single's + * {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its + * Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which + * will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case, + * because no Single necessarily invokes {@code onError}, the Observer may never know that an error + * happened. + *

+ * You can use this to prevent errors from propagating or to supply fallback data should errors be + * encountered. + *

+ *
Scheduler:
+ *
{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error. + * @return the original Single, with appropriately modified behavior. + * @see ReactiveX operators documentation: Catch + */ + public final Single onErrorResumeNext(Single resumeSingleInCaseOfError) { + return new Single(new SingleOperatorOnErrorResumeNextViaSingle(this, resumeSingleInCaseOfError)); + } + /** * Subscribes to a Single but ignore its emission or notification. *
diff --git a/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java b/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java new file mode 100644 index 0000000000..ca47f9c3e9 --- /dev/null +++ b/src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java @@ -0,0 +1,45 @@ +package rx.internal.operators; + +import rx.Single; +import rx.SingleSubscriber; +import rx.plugins.RxJavaPlugins; + +public class SingleOperatorOnErrorResumeNextViaSingle implements Single.OnSubscribe { + + private final Single originalSingle; + private final Single resumeSingleInCaseOfError; + + public SingleOperatorOnErrorResumeNextViaSingle(Single originalSingle, Single resumeSingleInCaseOfError) { + if (originalSingle == null) { + throw new NullPointerException("originalSingle must not be null"); + } + + if (resumeSingleInCaseOfError == null) { + throw new NullPointerException("resumeSingleInCaseOfError must not be null"); + } + + this.originalSingle = originalSingle; + this.resumeSingleInCaseOfError = resumeSingleInCaseOfError; + } + + @Override + public void call(final SingleSubscriber child) { + final SingleSubscriber parent = new SingleSubscriber() { + @Override + public void onSuccess(T value) { + child.onSuccess(value); + } + + @Override + public void onError(Throwable error) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(error); + unsubscribe(); + + resumeSingleInCaseOfError.subscribe(child); + } + }; + + child.add(parent); + originalSingle.subscribe(parent); + } +} diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index b29fcb01af..15de891636 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -1,11 +1,11 @@ /** * Copyright 2015 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. @@ -1182,6 +1182,42 @@ public void doAfterTerminateActionShouldNotBeInvokedUntilSubscriberSubscribes() verifyZeroInteractions(action); } + @Test + public void onErrorResumeNextViaSingleShouldNotInterruptSuccessfulSingle() { + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .just("success") + .onErrorResumeNext(Single.just("fail")) + .subscribe(testSubscriber); + + testSubscriber.assertValue("success"); + } + + @Test + public void onErrorResumeNextViaSingleShouldResumeWithPassedSingleInCaseOfError() { + TestSubscriber testSubscriber = new TestSubscriber(); + + Single + .error(new RuntimeException("test exception")) + .onErrorResumeNext(Single.just("fallback")) + .subscribe(testSubscriber); + + testSubscriber.assertValue("fallback"); + } + + @Test + public void onErrorResumeNextViaSingleShouldPreventNullSingle() { + try { + Single + .just("value") + .onErrorResumeNext(null); + fail(); + } catch (NullPointerException expected) { + assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage()); + } + } + @Test(expected = NullPointerException.class) public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() { Single.iterableToArray(null);