Skip to content

Commit

Permalink
Merge pull request #1507 from zsxwing/backpressure-bufferWithSize
Browse files Browse the repository at this point in the history
BufferWithSize with Backpressure Support
  • Loading branch information
benjchristensen committed Aug 11, 2014
2 parents 54c222c + e89a39f commit 4e4b0cb
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

/**
Expand Down Expand Up @@ -51,6 +53,12 @@ public final class OperatorBufferWithSize<T> implements Operator<List<T>, T> {
* into a buffer at all!
*/
public OperatorBufferWithSize(int count, int skip) {
if (count <= 0) {
throw new IllegalArgumentException("count must be greater than 0");
}
if (skip <= 0) {
throw new IllegalArgumentException("skip must be greater than 0");
}
this.count = count;
this.skip = skip;
}
Expand All @@ -60,6 +68,29 @@ public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
if (count == skip) {
return new Subscriber<T>(child) {
List<T> buffer;

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

private volatile boolean infinite = false;

@Override
public void request(long n) {
if (infinite) {
return;
}
if (n >= Long.MAX_VALUE / count) {
// n == Long.MAX_VALUE or n * count >= Long.MAX_VALUE
infinite = true;
producer.request(Long.MAX_VALUE);
} else {
producer.request(n * count);
}
}
});
}

@Override
public void onNext(T t) {
if (buffer == null) {
Expand Down Expand Up @@ -98,6 +129,60 @@ public void onCompleted() {
return new Subscriber<T>(child) {
final List<List<T>> chunks = new LinkedList<List<T>>();
int index;

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

private volatile boolean firstRequest = true;
private volatile boolean infinite = false;

private void requestInfinite() {
infinite = true;
producer.request(Long.MAX_VALUE);
}

@Override
public void request(long n) {
if (infinite) {
return;
}
if (n == Long.MAX_VALUE) {
requestInfinite();
return;
} else {
if (firstRequest) {
firstRequest = false;
if (n - 1 >= (Long.MAX_VALUE - count) / skip) {
// count + skip * (n - 1) >= Long.MAX_VALUE
requestInfinite();
return;
}
// count = 5, skip = 2, n = 3
// * * * * *
// * * * * *
// * * * * *
// request = 5 + 2 * ( 3 - 1)
producer.request(count + skip * (n - 1));
} else {
if (n >= Long.MAX_VALUE / skip) {
// skip * n >= Long.MAX_VALUE
requestInfinite();
return;
}
// count = 5, skip = 2, n = 3
// (* * *) * *
// ( *) * * * *
// * * * * *
// request = skip * n
// "()" means the items already emitted before this request
producer.request(skip * n);
}
}
}
});
}

@Override
public void onNext(T t) {
if (index++ % skip == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -36,6 +38,7 @@

import rx.Observable;
import rx.Observer;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
Expand All @@ -44,6 +47,7 @@
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -791,4 +795,190 @@ public Observable<Integer> call(Integer t1) {
verify(o, never()).onCompleted();
verify(o).onError(any(TestException.class));
}

@Test
public void testProducerRequestThroughBufferWithSize1() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(3);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 5).subscribe(ts);
assertEquals(15, requested.get());

ts.requestMore(4);
assertEquals(20, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize2() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 5).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize3() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(3);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 2).subscribe(ts);
assertEquals(9, requested.get());
ts.requestMore(3);
assertEquals(6, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize4() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 2).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}


@Test
public void testProducerRequestOverflowThroughBufferWithSize1() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(Long.MAX_VALUE / 2);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(3, 3).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestOverflowThroughBufferWithSize2() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(Long.MAX_VALUE / 2);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(3, 2).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestOverflowThroughBufferWithSize3() {
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
s.onNext(1);
s.onNext(2);
s.onNext(3);
}

});
}

}).buffer(3, 2).subscribe(new Subscriber<List<Integer>>() {

@Override
public void onStart() {
request(Long.MAX_VALUE / 2 - 4);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(List<Integer> t) {
request(Long.MAX_VALUE / 2);
}

});
assertEquals(Long.MAX_VALUE, requested.get());
}
}

0 comments on commit 4e4b0cb

Please sign in to comment.