Skip to content

Commit

Permalink
2.x: add safeguards to generate() (#4932)
Browse files Browse the repository at this point in the history
* 2.x: add safeguards to generate()

* Fix typo in the test name
  • Loading branch information
akarnokd authored Dec 21, 2016
1 parent a902d4a commit 4851637
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ static final class GeneratorSubscription<T, S>

boolean terminate;

boolean hasNext;

GeneratorSubscription(Subscriber<? super T> actual,
BiFunction<S, ? super Emitter<T>, S> generator,
Consumer<? super S> disposeState, S initialState) {
Expand Down Expand Up @@ -96,21 +98,27 @@ public void request(long n) {
while (e != n) {

if (cancelled) {
state = null;
dispose(s);
return;
}

hasNext = false;

try {
s = f.apply(s, this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
actual.onError(ex);
state = null;
onError(ex);
dispose(s);
return;
}

if (terminate) {
cancelled = true;
state = null;
dispose(s);
return;
}
Expand Down Expand Up @@ -146,33 +154,48 @@ public void cancel() {

// if there are no running requests, just dispose the state
if (BackpressureHelper.add(this, 1) == 0) {
dispose(state);
S s = state;
state = null;
dispose(s);
}
}
}

@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
if (!terminate) {
if (hasNext) {
onError(new IllegalStateException("onNext already called in this generate turn"));
} else {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
hasNext = true;
actual.onNext(t);
}
}
}
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (terminate) {
RxJavaPlugins.onError(t);
} else {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
terminate = true;
actual.onError(t);
}
terminate = true;
actual.onError(t);
}

@Override
public void onComplete() {
terminate = true;
actual.onComplete();
if (!terminate) {
terminate = true;
actual.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ static final class GeneratorDisposable<T, S>

boolean terminate;

boolean hasNext;

GeneratorDisposable(Observer<? super T> actual,
BiFunction<S, ? super Emitter<T>, S> generator,
Consumer<? super S> disposeState, S initialState) {
Expand Down Expand Up @@ -92,13 +94,16 @@ public void run() {
return;
}

hasNext = false;

try {
s = f.apply(s, this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
state = null;
cancelled = true;
actual.onError(ex);
onError(ex);
dispose(s);
return;
}

Expand Down Expand Up @@ -131,28 +136,42 @@ public boolean isDisposed() {
return cancelled;
}


@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
if (!terminate) {
if (hasNext) {
onError(new IllegalStateException("onNext already called in this generate turn"));
} else {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
} else {
hasNext = true;
actual.onNext(t);
}
}
}
actual.onNext(t);
}

@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (terminate) {
RxJavaPlugins.onError(t);
} else {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
terminate = true;
actual.onError(t);
}
terminate = true;
actual.onError(t);
}

@Override
public void onComplete() {
terminate = true;
actual.onComplete();
if (!terminate) {
terminate = true;
actual.onComplete();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,50 @@ public void run() {
ts.assertValueCount(1000);
}
}

@Test
public void multipleOnNext() {
Flowable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
})
.test(1)
.assertFailure(IllegalStateException.class, 1);
}

@Test
public void multipleOnError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onError(new TestException("First"));
e.onError(new TestException("Second"));
}
})
.test(1)
.assertFailure(TestException.class);

TestHelper.assertError(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void multipleOnComplete() {
Flowable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onComplete();
e.onComplete();
}
})
.test(1)
.assertResult();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,50 @@ public void accept(Integer s, Emitter<Object> e) throws Exception {

assertEquals(0, call[0]);
}

@Test
public void multipleOnNext() {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
})
.test()
.assertFailure(IllegalStateException.class, 1);
}

@Test
public void multipleOnError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onError(new TestException("First"));
e.onError(new TestException("Second"));
}
})
.test()
.assertFailure(TestException.class);

TestHelper.assertError(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}

@Test
public void multipleOnComplete() {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onComplete();
e.onComplete();
}
})
.test()
.assertResult();
}
}

0 comments on commit 4851637

Please sign in to comment.