Skip to content

Commit

Permalink
Merge pull request #2590 from akarnokd/ZipHangFix
Browse files Browse the repository at this point in the history
Zip: fixed unbounded downstream requesting above Long.MAX_VALUE
  • Loading branch information
benjchristensen committed Feb 3, 2015
2 parents 0cecfc5 + 01e97fc commit 43a912f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
7 changes: 5 additions & 2 deletions src/main/java/rx/internal/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public Subscriber<? super Observable[]> call(final Subscriber<? super R> child)
return subscriber;
}

@SuppressWarnings("rawtypes")
private final class ZipSubscriber extends Subscriber<Observable[]> {

final Subscriber<? super R> child;
Expand Down Expand Up @@ -158,7 +159,8 @@ public void onNext(Observable[] observables) {
}

private static final class ZipProducer<R> extends AtomicLong implements Producer {

/** */
private static final long serialVersionUID = -1216676403723546796L;
private Zip<R> zipper;

public ZipProducer(Zip<R> zipper) {
Expand All @@ -167,7 +169,7 @@ public ZipProducer(Zip<R> zipper) {

@Override
public void request(long n) {
addAndGet(n);
BackpressureUtils.getAndAddRequest(this, n);
// try and claim emission if no other threads are doing so
zipper.tick();
}
Expand All @@ -179,6 +181,7 @@ private static final class Zip<R> {
private final FuncN<? extends R> zipFunction;
private final CompositeSubscription childSubscription = new CompositeSubscription();

@SuppressWarnings("unused")
volatile long counter;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/rx/internal/operators/OperatorZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1243,4 +1243,27 @@ public Integer call(Integer i1, Integer i2) {
}
assertEquals(expected, zip2.toList().toBlocking().single());
}
@Test
public void testUnboundedDownstreamOverrequesting() {
Observable<Integer> source = Observable.range(1, 2).zipWith(Observable.range(1, 2), new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer t1, Integer t2) {
return t1 + 10 * t2;
}
});

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
requestMore(5);
}
};

source.subscribe(ts);

ts.assertNoErrors();
ts.assertTerminalEvent();
ts.assertReceivedOnNext(Arrays.asList(11, 22));
}
}

0 comments on commit 43a912f

Please sign in to comment.