diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java index 0520670a93..5b7a2a796a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java @@ -68,6 +68,8 @@ static final class GeneratorSubscription boolean terminate; + boolean hasNext; + GeneratorSubscription(Subscriber actual, BiFunction, S> generator, Consumer disposeState, S initialState) { @@ -96,21 +98,27 @@ public void request(long n) { while (e != n) { if (cancelled) { + state = null; dispose(s); return; } + hasNext = false; + try { s = f.apply(s, this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); cancelled = true; - actual.onError(ex); + state = null; + onError(ex); + dispose(s); return; } if (terminate) { cancelled = true; + state = null; dispose(s); return; } @@ -146,33 +154,48 @@ public void cancel() { // if there are no running requests, just dispose the state if (BackpressureHelper.add(this, 1) == 0) { - dispose(state); + S s = state; + state = null; + dispose(s); } } } @Override public void onNext(T t) { - if (t == null) { - onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); - return; + if (!terminate) { + if (hasNext) { + onError(new IllegalStateException("onNext already called in this generate turn")); + } else { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + } else { + hasNext = true; + actual.onNext(t); + } + } } - actual.onNext(t); } @Override public void onError(Throwable t) { - if (t == null) { - t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + if (terminate) { + RxJavaPlugins.onError(t); + } else { + if (t == null) { + t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + } + terminate = true; + actual.onError(t); } - terminate = true; - actual.onError(t); } @Override public void onComplete() { - terminate = true; - actual.onComplete(); + if (!terminate) { + terminate = true; + actual.onComplete(); + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java index f5bcc6516d..50a38cf29f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableGenerate.java @@ -64,6 +64,8 @@ static final class GeneratorDisposable boolean terminate; + boolean hasNext; + GeneratorDisposable(Observer actual, BiFunction, S> generator, Consumer disposeState, S initialState) { @@ -92,13 +94,16 @@ public void run() { return; } + hasNext = false; + try { s = f.apply(s, this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); state = null; cancelled = true; - actual.onError(ex); + onError(ex); + dispose(s); return; } @@ -131,28 +136,42 @@ public boolean isDisposed() { return cancelled; } + @Override public void onNext(T t) { - if (t == null) { - onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); - return; + if (!terminate) { + if (hasNext) { + onError(new IllegalStateException("onNext already called in this generate turn")); + } else { + if (t == null) { + onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); + } else { + hasNext = true; + actual.onNext(t); + } + } } - actual.onNext(t); } @Override public void onError(Throwable t) { - if (t == null) { - t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + if (terminate) { + RxJavaPlugins.onError(t); + } else { + if (t == null) { + t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); + } + terminate = true; + actual.onError(t); } - terminate = true; - actual.onError(t); } @Override public void onComplete() { - terminate = true; - actual.onComplete(); + if (!terminate) { + terminate = true; + actual.onComplete(); + } } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java index 6166c8f38d..aaf268fb5e 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGenerateTest.java @@ -236,4 +236,50 @@ public void run() { ts.assertValueCount(1000); } } + + @Test + public void multipleOnNext() { + Flowable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onNext(1); + e.onNext(2); + } + }) + .test(1) + .assertFailure(IllegalStateException.class, 1); + } + + @Test + public void multipleOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onError(new TestException("First")); + e.onError(new TestException("Second")); + } + }) + .test(1) + .assertFailure(TestException.class); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void multipleOnComplete() { + Flowable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onComplete(); + e.onComplete(); + } + }) + .test(1) + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java index 7235eb3f13..de4880b579 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableGenerateTest.java @@ -147,4 +147,50 @@ public void accept(Integer s, Emitter e) throws Exception { assertEquals(0, call[0]); } + + @Test + public void multipleOnNext() { + Observable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onNext(1); + e.onNext(2); + } + }) + .test() + .assertFailure(IllegalStateException.class, 1); + } + + @Test + public void multipleOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onError(new TestException("First")); + e.onError(new TestException("Second")); + } + }) + .test() + .assertFailure(TestException.class); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void multipleOnComplete() { + Observable.generate(new Consumer>() { + @Override + public void accept(Emitter e) throws Exception { + e.onComplete(); + e.onComplete(); + } + }) + .test() + .assertResult(); + } }