From 34b4b6b6614d2c37d11b595a468f1b1954485826 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sun, 4 Dec 2016 20:07:53 +1100 Subject: [PATCH] Observable.scan no seed fix post-terminal behaviour (#4904) --- .../operators/observable/ObservableScan.java | 17 +++- .../observable/ObservableScanTest.java | 85 +++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java index 86f323b295..68caa7760f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScan.java @@ -19,6 +19,7 @@ import io.reactivex.functions.BiFunction; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.plugins.RxJavaPlugins; public final class ObservableScan extends AbstractObservableWithUpstream { final BiFunction accumulator; @@ -39,6 +40,8 @@ static final class ScanObserver implements Observer, Disposable { Disposable s; T value; + + boolean done; ScanObserver(Observer actual, BiFunction accumulator) { this.actual = actual; @@ -67,6 +70,9 @@ public boolean isDisposed() { @Override public void onNext(T t) { + if (done) { + return; + } final Observer a = actual; T v = value; if (v == null) { @@ -80,7 +86,7 @@ public void onNext(T t) { } catch (Throwable e) { Exceptions.throwIfFatal(e); s.dispose(); - a.onError(e); + onError(e); return; } @@ -91,11 +97,20 @@ public void onNext(T t) { @Override public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + done = true; actual.onError(t); } @Override public void onComplete() { + if (done) { + return; + } + done = true; actual.onComplete(); } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java index 7bcecd4e66..12f53220f9 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableScanTest.java @@ -19,6 +19,7 @@ import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -26,9 +27,12 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.PublishSubject; public class ObservableScanTest { @@ -300,4 +304,85 @@ public Object apply(Object a, Object b) throws Exception { } }, false, 1, 1, 0, 0); } + + @Test + public void testScanFunctionThrowsAndUpstreamErrorsDoesNotResultInTwoTerminalEvents() { + final RuntimeException err = new RuntimeException(); + final RuntimeException err2 = new RuntimeException(); + final List list = new CopyOnWriteArrayList(); + final Consumer errorConsumer = new Consumer() { + @Override + public void accept(Throwable t) throws Exception { + list.add(t); + }}; + try { + RxJavaPlugins.setErrorHandler(errorConsumer); + Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer o) { + Disposable d = Disposables.empty(); + o.onSubscribe(d); + o.onNext(1); + o.onNext(2); + o.onError(err2); + }}) + .scan(new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + throw err; + }}) + .test() + .assertError(err) + .assertValue(1); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void testScanFunctionThrowsAndUpstreamCompletesDoesNotResultInTwoTerminalEvents() { + final RuntimeException err = new RuntimeException(); + Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer o) { + Disposable d = Disposables.empty(); + o.onSubscribe(d); + o.onNext(1); + o.onNext(2); + o.onComplete(); + }}) + .scan(new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + throw err; + }}) + .test() + .assertError(err) + .assertValue(1); + } + + @Test + public void testScanFunctionThrowsAndUpstreamEmitsOnNextResultsInScanFunctionBeingCalledOnlyOnce() { + final RuntimeException err = new RuntimeException(); + final AtomicInteger count = new AtomicInteger(); + Observable.unsafeCreate(new ObservableSource() { + @Override + public void subscribe(Observer o) { + Disposable d = Disposables.empty(); + o.onSubscribe(d); + o.onNext(1); + o.onNext(2); + o.onNext(3); + }}) + .scan(new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + count.incrementAndGet(); + throw err; + }}) + .test() + .assertError(err) + .assertValue(1); + assertEquals(1, count.get()); + } } \ No newline at end of file