diff --git a/src/main/java/rx/internal/operators/OperatorZip.java b/src/main/java/rx/internal/operators/OperatorZip.java index 2c28eb0112..d7271478a7 100644 --- a/src/main/java/rx/internal/operators/OperatorZip.java +++ b/src/main/java/rx/internal/operators/OperatorZip.java @@ -117,6 +117,7 @@ public Subscriber call(final Subscriber child) return subscriber; } + @SuppressWarnings("rawtypes") private final class ZipSubscriber extends Subscriber { final Subscriber child; @@ -158,7 +159,8 @@ public void onNext(Observable[] observables) { } private static final class ZipProducer extends AtomicLong implements Producer { - + /** */ + private static final long serialVersionUID = -1216676403723546796L; private Zip zipper; public ZipProducer(Zip zipper) { @@ -167,7 +169,7 @@ public ZipProducer(Zip zipper) { @Override public void request(long n) { - addAndGet(n); + BackpressureUtils.getAndAddRequest(this, n); // try and claim emission if no other threads are doing so zipper.tick(); } @@ -179,6 +181,7 @@ private static final class Zip { private final FuncN zipFunction; private final CompositeSubscription childSubscription = new CompositeSubscription(); + @SuppressWarnings("unused") volatile long counter; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter"); diff --git a/src/test/java/rx/internal/operators/OperatorZipTest.java b/src/test/java/rx/internal/operators/OperatorZipTest.java index ffcf9a769e..166be948f6 100644 --- a/src/test/java/rx/internal/operators/OperatorZipTest.java +++ b/src/test/java/rx/internal/operators/OperatorZipTest.java @@ -1243,4 +1243,27 @@ public Integer call(Integer i1, Integer i2) { } assertEquals(expected, zip2.toList().toBlocking().single()); } + @Test + public void testUnboundedDownstreamOverrequesting() { + Observable source = Observable.range(1, 2).zipWith(Observable.range(1, 2), new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 + 10 * t2; + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + requestMore(5); + } + }; + + source.subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(11, 22)); + } }