diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java index 329314c34b..fd3fd54b37 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java @@ -60,6 +60,9 @@ public void onNext(T value) { } else { this.value = value; isNonEmpty = true; + // Issue: https://github.com/Netflix/RxJava/pull/1527 + // Because we cache a value and don't emit now, we need to request another one. + request(1); } } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java index 9b1ac935eb..b5fce7216f 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java @@ -108,9 +108,12 @@ void startEmitting() { @Override public void request(long n) { - long _c = 0; + if (requested == Long.MAX_VALUE) { + return; + } + long _c; if (n == Long.MAX_VALUE) { - requested = Long.MAX_VALUE; + _c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE); } else { _c = REQUESTED_UPDATER.getAndAdd(this, n); } @@ -122,16 +125,20 @@ public void request(long n) { } void emit(long previousRequested) { - if (requested < 0) { + if (requested == Long.MAX_VALUE) { // fast-path without backpressure - try { - for (Object value : deque) { - notification.accept(subscriber, value); + if (previousRequested == 0) { + try { + for (Object value : deque) { + notification.accept(subscriber, value); + } + } catch (Throwable e) { + subscriber.onError(e); + } finally { + deque.clear(); } - } catch (Throwable e) { - subscriber.onError(e); - } finally { - deque.clear(); + } else { + // backpressure path will handle Long.MAX_VALUE and emit the rest events. } } else { // backpressure is requested @@ -155,12 +162,22 @@ void emit(long previousRequested) { emitted++; } } - - if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0) { - // we're done emitting the number requested so return - return; + for (;;) { + long oldRequested = requested; + long newRequested = oldRequested - emitted; + if (oldRequested == Long.MAX_VALUE) { + // became unbounded during the loop + // continue the outer loop to emit the rest events. + break; + } + if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) { + if (newRequested == 0) { + // we're done emitting the number requested so return + return; + } + break; + } } - } } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java index a900e4c706..6910390a51 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java @@ -15,10 +15,9 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.*; import java.util.NoSuchElementException; @@ -27,7 +26,9 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.functions.Func1; +import rx.functions.Func2; public class OperatorSingleTest { @@ -241,4 +242,52 @@ public Boolean call(Integer t1) { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testSingleWithBackpressure() { + Observable observable = Observable.from(1, 2).single(); + + Subscriber subscriber = spy(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + observable.subscribe(subscriber); + + InOrder inOrder = inOrder(subscriber); + inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test(timeout = 30000) + public void testIssue1527() throws InterruptedException { + //https://github.com/Netflix/RxJava/pull/1527 + Observable source = Observable.from(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(new Func2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java index 1db81cf9fd..13b626bf6e 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java @@ -30,7 +30,9 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.functions.Func1; +import rx.functions.Functions; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -148,4 +150,118 @@ public Integer call(Integer i) { }; } + @Test + public void testIssue1522() { + // https://github.com/Netflix/RxJava/issues/1522 + assertEquals(0, Observable + .empty() + .count() + .filter(Functions.alwaysFalse()) + .toList() + .toBlocking().single().size()); + } + + @Test + public void testIgnoreRequest1() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(Long.MAX_VALUE); + } + }); + } + + @Test + public void testIgnoreRequest2() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + } + + @Test(timeout = 30000) + public void testIgnoreRequest3() { + // If `takeLast` does not ignore `request` properly, it will enter an infinite loop. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(Long.MAX_VALUE); + } + }); + } + + + @Test + public void testIgnoreRequest4() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + } }