Skip to content

Commit

Permalink
Merge pull request #2550 from davidmoten/request-additive
Browse files Browse the repository at this point in the history
Subscriber.onStart requests should be additive (and check for overflow)
  • Loading branch information
akarnokd committed Jan 28, 2015
2 parents a82800c + 7199792 commit 0da0d44
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
10 changes: 9 additions & 1 deletion src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ protected final void request(long n) {
synchronized (this) {
if (p != null) {
shouldRequest = p;
} else {
} else if (requested == Long.MIN_VALUE) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}
// after releasing lock
Expand Down
60 changes: 57 additions & 3 deletions src/test/java/rx/SubscriberTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -343,7 +346,6 @@ public void onError(Throwable e) {

@Override
public void onNext(Integer t) {
System.out.println(t);
request(1);
}

Expand Down Expand Up @@ -375,7 +377,6 @@ public void onError(Throwable e) {

@Override
public void onNext(Integer t) {
System.out.println(t);
request(1);
}

Expand Down Expand Up @@ -411,7 +412,6 @@ public void onError(Throwable e) {

@Override
public void onNext(Integer t) {
System.out.println(t);
child.onNext(t);
request(1);
}
Expand Down Expand Up @@ -454,4 +454,58 @@ public void onNext(Integer t) {
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(exception.get() instanceof IllegalArgumentException);
}

@Test
public void testOnStartRequestsAreAdditive() {
final List<Integer> list = new ArrayList<Integer>();
Observable.just(1,2,3,4,5).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(3);
request(2);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
list.add(t);
}});
assertEquals(Arrays.asList(1,2,3,4,5), list);
}

@Test
public void testOnStartRequestsAreAdditiveAndOverflowBecomesMaxValue() {
final List<Integer> list = new ArrayList<Integer>();
Observable.just(1,2,3,4,5).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(2);
request(Long.MAX_VALUE-1);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
list.add(t);
}});
assertEquals(Arrays.asList(1,2,3,4,5), list);
}
}

0 comments on commit 0da0d44

Please sign in to comment.