diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index 2da1844ca9..282531c0de 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -131,8 +131,8 @@ private static final class MergeSubscriber extends Subscriber t) { T value = t.get(); - if (getEmitLock()) { - boolean moreToDrain; - try { - actual.onNext(value); - } finally { - moreToDrain = releaseEmitLock(); - } - if (moreToDrain) { - drainQueuesIfNeeded(); - } - request(1); - return; - } else { - try { - getOrCreateScalarValueQueue().onNext(value); - } catch (MissingBackpressureException e) { - onError(e); + try { + synchronized (this) { + if (emitLock) { + missedEmitting = true; + getOrCreateScalarValueQueue().onNext(value); + return; + } + missedEmitting = false; + emitLock = true; } + } catch (MissingBackpressureException e) { + onError(e); return; } + boolean moreToDrain; + try { + actual.onNext(value); + } finally { + moreToDrain = releaseEmitLock(); + } + if (moreToDrain) { + drainQueuesIfNeeded(); + } + request(1); } private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable t) { + + // if we didn't return above we need to enqueue + // enqueue the values for later delivery + try { + getOrCreateScalarValueQueue().onNext(t.get()); + } catch (MissingBackpressureException e) { + onError(e); + return; + } + if (getEmitLock()) { - boolean emitted = false; + int emitted = 0; boolean moreToDrain; - boolean isReturn = false; try { - long r = mergeProducer.requested; - if (r > 0) { - emitted = true; - actual.onNext(t.get()); - MergeProducer.REQUESTED.decrementAndGet(mergeProducer); - // we handle this Observable without ever incrementing the wip or touching other machinery so just return here - isReturn = true; - } + emitted = drainScalarValueQueue(); } finally { moreToDrain = releaseEmitLock(); } if (moreToDrain) { drainQueuesIfNeeded(); } - if (emitted) { - request(1); - } - if (isReturn) { - return; + if (emitted > 0) { + request(emitted); } } - - // if we didn't return above we need to enqueue - // enqueue the values for later delivery - try { - getOrCreateScalarValueQueue().onNext(t.get()); - } catch (MissingBackpressureException e) { - onError(e); - } } private RxRingBuffer getOrCreateScalarValueQueue() { @@ -316,20 +311,16 @@ private RxRingBuffer getOrCreateScalarValueQueue() { private synchronized boolean releaseEmitLock() { emitLock = false; - if (missedEmitting == 0) { - return false; - } else { - return true; - } + return missedEmitting; } private synchronized boolean getEmitLock() { if (emitLock) { - missedEmitting++; + missedEmitting = true; return false; } else { emitLock = true; - missedEmitting = 0; + missedEmitting = false; return true; } } @@ -446,7 +437,8 @@ private void innerError(Throwable e, boolean parent) { if (!parent) { wip--; } - if ((wip == 0 && completed) || (wip < 0)) { + RxRingBuffer svq = scalarValueQueue; + if ((wip == 0 && completed && (svq == null || svq.isEmpty())) || (wip < 0)) { sendOnComplete = true; } } @@ -463,7 +455,8 @@ public void onCompleted() { boolean c = false; synchronized (this) { completed = true; - if (wip == 0) { + RxRingBuffer svq = scalarValueQueue; + if (wip == 0 && (svq == null || svq.isEmpty())) { c = true; } } @@ -477,7 +470,8 @@ void completeInner(InnerSubscriber s) { boolean sendOnComplete = false; synchronized (this) { wip--; - if (wip == 0 && completed) { + RxRingBuffer svq = scalarValueQueue; + if (wip == 0 && completed && (svq == null || svq.isEmpty())) { sendOnComplete = true; } } @@ -491,12 +485,12 @@ private void drainAndComplete() { boolean moreToDrain = true; while (moreToDrain) { synchronized (this) { - missedEmitting = 0; + missedEmitting = false; } drainScalarValueQueue(); drainChildrenQueues(); synchronized (this) { - moreToDrain = missedEmitting > 0; + moreToDrain = missedEmitting; } } RxRingBuffer svq = scalarValueQueue; @@ -549,7 +543,8 @@ public void request(long n) { if (ms.drainQueuesIfNeeded()) { boolean sendComplete = false; synchronized (ms) { - if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) { + RxRingBuffer svq = ms.scalarValueQueue; + if (ms.wip == 0 && ms.completed && (svq == null || svq.isEmpty())) { sendComplete = true; } } diff --git a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java index a4635f1512..7b32359524 100644 --- a/src/test/java/rx/internal/operators/OperatorFlatMapTest.java +++ b/src/test/java/rx/internal/operators/OperatorFlatMapTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -416,4 +417,73 @@ public void testFlatMapTransformsMaxConcurrentNormal() { verify(o, never()).onNext(5); verify(o, never()).onError(any(Throwable.class)); } + @Test(timeout = 10000) + public void flatMapRangeAsyncLoop() { + for (int i = 0; i < 2000; i++) { + if (i % 10 == 0) { + System.out.println("flatMapRangeAsyncLoop > " + i); + } + TestSubscriber ts = new TestSubscriber(); + Observable.range(0, 1000) + .flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(t); + } + }) + .observeOn(Schedulers.computation()) + .subscribe(ts); + + ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS); + if (ts.getOnCompletedEvents().isEmpty()) { + System.out.println(ts.getOnNextEvents().size()); + } + ts.assertTerminalEvent(); + ts.assertNoErrors(); + List list = ts.getOnNextEvents(); + assertEquals(1000, list.size()); + boolean f = false; + for (int j = 0; j < list.size(); j++) { + if (list.get(j) != j) { + System.out.println(j + " " + list.get(j)); + f = true; + } + } + if (f) { + Assert.fail("Results are out of order!"); + } + } + } + @Test(timeout = 10000) + public void flatMapRangeMixedAsyncLoop() { + for (int i = 0; i < 2000; i++) { + if (i % 10 == 0) { + System.out.println("flatMapRangeAsyncLoop > " + i); + } + TestSubscriber ts = new TestSubscriber(); + Observable.range(0, 1000) + .flatMap(new Func1>() { + final Random rnd = new Random(); + @Override + public Observable call(Integer t) { + Observable r = Observable.just(t); + if (rnd.nextBoolean()) { + r = r.asObservable(); + } + return r; + } + }) + .observeOn(Schedulers.computation()) + .subscribe(ts); + + ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS); + if (ts.getOnCompletedEvents().isEmpty()) { + System.out.println(ts.getOnNextEvents().size()); + } + ts.assertTerminalEvent(); + ts.assertNoErrors(); + List list = ts.getOnNextEvents(); + assertEquals(1000, list.size()); + } + } } diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 7d785b4088..0c031007f4 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -628,7 +628,7 @@ public void onNext(Integer t) { assertTrue(generated1.get() >= RxRingBuffer.SIZE * 2 && generated1.get() <= RxRingBuffer.SIZE * 4); } - @Test + @Test//(timeout = 10000) public void testBackpressureUpstream2InLoop() throws InterruptedException { for (int i = 0; i < 1000; i++) { System.err.flush(); @@ -651,7 +651,7 @@ public void onNext(Integer t) { }; Observable.merge(o1.take(RxRingBuffer.SIZE * 2), Observable.just(-99)).subscribe(testSubscriber); - testSubscriber.awaitTerminalEvent(); + testSubscriber.awaitTerminalEvent(2, TimeUnit.SECONDS); List onNextEvents = testSubscriber.getOnNextEvents();