diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index d5cc42c97e..99394764a6 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -26,7 +26,6 @@ import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; import rx.internal.util.RxRingBuffer; -import rx.internal.util.SynchronizedSubscription; import rx.schedulers.ImmediateScheduler; import rx.schedulers.TrampolineScheduler; @@ -97,12 +96,10 @@ public void request(long n) { } }); - add(scheduledUnsubscribe); child.add(recursiveScheduler); child.add(this); - } - + @Override public void onStart() { // signal that this is an async operator capable of receiving this many @@ -160,53 +157,52 @@ public void call() { } } + // only execute this from schedule() private void pollQueue() { int emitted = 0; - while (true) { + do { + /* + * Set to 1 otherwise it could have grown very large while in the last poll loop + * and then we can end up looping all those times again here before exiting even once we've drained + */ + COUNTER_UPDATER.set(this, 1); + while (!scheduledUnsubscribe.isUnsubscribed()) { - if (REQUESTED.getAndDecrement(this) != 0) { + if (failure) { + // special handling to short-circuit an error propagation Object o = queue.poll(); - if (o == null) { - // nothing in queue - REQUESTED.incrementAndGet(this); - break; - } else { - if (failure) { - // completed so we will skip onNext if they exist and only emit terminal events - if (on.isError(o)) { - System.out.println("Error: " + o); - // only emit error - on.accept(child, o); - } + // completed so we will skip onNext if they exist and only emit terminal events + if (on.isError(o)) { + // only emit error + on.accept(child, o); + // we have emitted a terminal event so return (exit the loop we're in) + return; + } + } else { + if (REQUESTED.getAndDecrement(this) != 0) { + Object o = queue.poll(); + if (o == null) { + // nothing in queue + REQUESTED.incrementAndGet(this); + break; } else { if (!on.accept(child, o)) { // non-terminal event so let's increment count emitted++; } } + } else { + // we hit the end ... so increment back to 0 again + REQUESTED.incrementAndGet(this); + break; } - } else { - // we hit the end ... so increment back to 0 again - REQUESTED.incrementAndGet(this); - break; } } - long c = COUNTER_UPDATER.decrementAndGet(this); - if (c <= 0) { - // request the number of items that we emitted in this poll loop - if (emitted > 0) { - request(emitted); - } - break; - } else { - /* - * Set down to 1 and then iterate again. - * we lower it to 1 otherwise it could have grown very large while in the last poll loop - * and then we can end up looping all those times again here before existing even once we've drained - */ - COUNTER_UPDATER.set(this, 1); - // we now loop again, and if anything tries scheduling again after this it will increment and cause us to loop again after - } + } while (COUNTER_UPDATER.decrementAndGet(this) > 0); + + // request the number of items that we emitted in this poll loop + if (emitted > 0) { + request(emitted); } } } diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 724ccb8bde..0dbb5bdaaa 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -34,9 +34,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.junit.Ignore; import org.junit.Test; import org.mockito.InOrder; +import rx.Notification; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; @@ -48,6 +50,7 @@ import rx.functions.Action0; import rx.functions.Action1; import rx.functions.Func1; +import rx.functions.Func2; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -391,7 +394,7 @@ public void testDelayedErrorDeliveryWhenSafeSubscriberUnsubscribes() { inOrder.verify(o, never()).onNext(anyInt()); inOrder.verify(o, never()).onCompleted(); } - + @Test public void testAfterUnsubscribeCalledThenObserverOnNextNeverCalled() { final TestScheduler testScheduler = new TestScheduler(); @@ -647,6 +650,71 @@ public void onNext(Long t) { assertTrue(ts.getOnNextEvents().size() == ts.getOnNextEvents().get(ts.getOnNextEvents().size() - 1) + 1); // we should emit the error without emitting the full buffer size assertTrue(ts.getOnNextEvents().size() < RxRingBuffer.SIZE); + } + + /** + * Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer. + */ + @Test + public void testHotOperatorBackpressure() { + TestSubscriber ts = new TestSubscriber(); + Observable.timer(0, 1, TimeUnit.MICROSECONDS) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + @Override + public String call(Long t1) { + System.out.println(t1); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + return t1 + " slow value"; + } + }).subscribe(ts); + + ts.awaitTerminalEvent(); + System.out.println("Errors: " + ts.getOnErrorEvents()); + assertEquals(1, ts.getOnErrorEvents().size()); + assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass()); } + + @Test + public void testErrorPropagatesWhenNoOutstandingRequests() { + Observable timer = Observable.timer(0, 1, TimeUnit.MICROSECONDS) + .doOnEach(new Action1>() { + + @Override + public void call(Notification n) { + // System.out.println("BEFORE " + n); + } + + }) + .observeOn(Schedulers.newThread()) + .doOnEach(new Action1>() { + + @Override + public void call(Notification n) { + // System.out.println("AFTER " + n); + } + + }); + + TestSubscriber ts = new TestSubscriber(); + + Observable.combineLatest(timer, Observable. never(), new Func2() { + + @Override + public Long call(Long t1, Integer t2) { + return t1; + } + + }).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + assertEquals(1, ts.getOnErrorEvents().size()); + assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass()); + } + }