Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BufferWithSize with Backpressure Support #1507

Merged
merged 2 commits into from
Aug 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

/**
Expand Down Expand Up @@ -51,6 +53,12 @@ public final class OperatorBufferWithSize<T> implements Operator<List<T>, T> {
* into a buffer at all!
*/
public OperatorBufferWithSize(int count, int skip) {
if (count <= 0) {
throw new IllegalArgumentException("count must be greater than 0");
}
if (skip <= 0) {
throw new IllegalArgumentException("skip must be greater than 0");
}
this.count = count;
this.skip = skip;
}
Expand All @@ -60,6 +68,29 @@ public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
if (count == skip) {
return new Subscriber<T>(child) {
List<T> buffer;

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

private volatile boolean infinite = false;

@Override
public void request(long n) {
if (infinite) {
return;
}
if (n >= Long.MAX_VALUE / count) {
// n == Long.MAX_VALUE or n * count >= Long.MAX_VALUE
infinite = true;
producer.request(Long.MAX_VALUE);
} else {
producer.request(n * count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could theoretically be larger than Long.MAX_VALUE

}
}
});
}

@Override
public void onNext(T t) {
if (buffer == null) {
Expand Down Expand Up @@ -98,6 +129,60 @@ public void onCompleted() {
return new Subscriber<T>(child) {
final List<List<T>> chunks = new LinkedList<List<T>>();
int index;

@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

private volatile boolean firstRequest = true;
private volatile boolean infinite = false;

private void requestInfinite() {
infinite = true;
producer.request(Long.MAX_VALUE);
}

@Override
public void request(long n) {
if (infinite) {
return;
}
if (n == Long.MAX_VALUE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't checking for whether it is already Long.MAX_VALUE and should be ignored.

Possibly in this case the fast path isn't worth having?

requestInfinite();
return;
} else {
if (firstRequest) {
firstRequest = false;
if (n - 1 >= (Long.MAX_VALUE - count) / skip) {
// count + skip * (n - 1) >= Long.MAX_VALUE
requestInfinite();
return;
}
// count = 5, skip = 2, n = 3
// * * * * *
// * * * * *
// * * * * *
// request = 5 + 2 * ( 3 - 1)
producer.request(count + skip * (n - 1));
} else {
if (n >= Long.MAX_VALUE / skip) {
// skip * n >= Long.MAX_VALUE
requestInfinite();
return;
}
// count = 5, skip = 2, n = 3
// (* * *) * *
// ( *) * * * *
// * * * * *
// request = skip * n
// "()" means the items already emitted before this request
producer.request(skip * n);
}
}
}
});
}

@Override
public void onNext(T t) {
if (index++ % skip == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
Expand All @@ -28,6 +29,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -36,6 +38,7 @@

import rx.Observable;
import rx.Observer;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
Expand All @@ -44,6 +47,7 @@
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

Expand Down Expand Up @@ -791,4 +795,190 @@ public Observable<Integer> call(Integer t1) {
verify(o, never()).onCompleted();
verify(o).onError(any(TestException.class));
}

@Test
public void testProducerRequestThroughBufferWithSize1() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(3);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 5).subscribe(ts);
assertEquals(15, requested.get());

ts.requestMore(4);
assertEquals(20, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize2() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 5).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize3() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(3);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 2).subscribe(ts);
assertEquals(9, requested.get());
ts.requestMore(3);
assertEquals(6, requested.get());
}

@Test
public void testProducerRequestThroughBufferWithSize4() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(5, 2).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}


@Test
public void testProducerRequestOverflowThroughBufferWithSize1() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(Long.MAX_VALUE / 2);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(3, 3).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestOverflowThroughBufferWithSize2() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
ts.requestMore(Long.MAX_VALUE / 2);
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
}

});
}

}).buffer(3, 2).subscribe(ts);
assertEquals(Long.MAX_VALUE, requested.get());
}

@Test
public void testProducerRequestOverflowThroughBufferWithSize3() {
final AtomicLong requested = new AtomicLong();
Observable.create(new Observable.OnSubscribe<Integer>() {

@Override
public void call(final Subscriber<? super Integer> s) {
s.setProducer(new Producer() {

@Override
public void request(long n) {
requested.set(n);
s.onNext(1);
s.onNext(2);
s.onNext(3);
}

});
}

}).buffer(3, 2).subscribe(new Subscriber<List<Integer>>() {

@Override
public void onStart() {
request(Long.MAX_VALUE / 2 - 4);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(List<Integer> t) {
request(Long.MAX_VALUE / 2);
}

});
assertEquals(Long.MAX_VALUE, requested.get());
}
}