From 676313a49ef5a80ec652b1cde235eb476e6595e9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 24 Jul 2014 21:52:13 +0800 Subject: [PATCH 1/2] BufferWithSize with Backpressure Support --- .../operators/OperatorBufferWithSize.java | 57 ++++++++++ .../operators/OperatorBufferTest.java | 100 ++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java index b0add8b72d..cde0caaa1c 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java @@ -19,8 +19,11 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Observable; import rx.Observable.Operator; +import rx.Producer; import rx.Subscriber; /** @@ -51,6 +54,12 @@ public final class OperatorBufferWithSize implements Operator, T> { * into a buffer at all! */ public OperatorBufferWithSize(int count, int skip) { + if (count <= 0) { + throw new IllegalArgumentException("count must be greater than 0"); + } + if (skip <= 0) { + throw new IllegalArgumentException("skip must be greater than 0"); + } this.count = count; this.skip = skip; } @@ -60,6 +69,21 @@ public Subscriber call(final Subscriber> child) { if (count == skip) { return new Subscriber(child) { List buffer; + + @Override + public void setProducer(final Producer producer) { + child.setProducer(new Producer() { + @Override + public void request(long n) { + if (n == Long.MAX_VALUE) { + producer.request(Long.MAX_VALUE); + } else { + producer.request(n * count); + } + } + }); + } + @Override public void onNext(T t) { if (buffer == null) { @@ -98,6 +122,39 @@ public void onCompleted() { return new Subscriber(child) { final List> chunks = new LinkedList>(); int index; + + @Override + public void setProducer(final Producer producer) { + child.setProducer(new Producer() { + + private final AtomicBoolean firstRequest = new AtomicBoolean(false); + + @Override + public void request(long n) { + if (n == Long.MAX_VALUE) { + producer.request(Long.MAX_VALUE); + } else { + if (firstRequest.compareAndSet(false, true)) { + // count = 5, skip = 2, n = 3 + // * * * * * + // * * * * * + // * * * * * + // request = 5 + 2 * ( 3 - 1) + producer.request(count + skip * (n - 1)); + } else { + // count = 5, skip = 2, n = 3 + // (* * *) * * + // ( *) * * * * + // * * * * * + // request = skip * n + // "()" means the items already emitted before this request + producer.request(skip * n); + } + } + } + }); + } + @Override public void onNext(T t) { if (index++ % skip == 0) { diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java index 6e391f42d3..757af07fb5 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; import static org.mockito.Mockito.inOrder; @@ -28,6 +29,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Before; import org.junit.Test; @@ -36,6 +38,7 @@ import rx.Observable; import rx.Observer; +import rx.Producer; import rx.Scheduler; import rx.Subscriber; import rx.Subscription; @@ -44,6 +47,7 @@ import rx.functions.Action1; import rx.functions.Func0; import rx.functions.Func1; +import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; @@ -791,4 +795,100 @@ public Observable call(Integer t1) { verify(o, never()).onCompleted(); verify(o).onError(any(TestException.class)); } + + @Test + public void testProducerRequestThroughBufferWithSize1() { + TestSubscriber> ts = new TestSubscriber>(); + ts.requestMore(3); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(5, 5).subscribe(ts); + assertEquals(15, requested.get()); + + ts.requestMore(4); + assertEquals(20, requested.get()); + } + + @Test + public void testProducerRequestThroughBufferWithSize2() { + TestSubscriber> ts = new TestSubscriber>(); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(5, 5).subscribe(ts); + assertEquals(Long.MAX_VALUE, requested.get()); + } + + @Test + public void testProducerRequestThroughBufferWithSize3() { + TestSubscriber> ts = new TestSubscriber>(); + ts.requestMore(3); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(5, 2).subscribe(ts); + assertEquals(9, requested.get()); + ts.requestMore(3); + assertEquals(6, requested.get()); + } + + + @Test + public void testProducerRequestThroughBufferWithSize4() { + TestSubscriber> ts = new TestSubscriber>(); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(5, 2).subscribe(ts); + assertEquals(Long.MAX_VALUE, requested.get()); + } } From e89a39fbf3fe37b735067e367308e4b91809622a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 31 Jul 2014 23:14:11 +0800 Subject: [PATCH 2/2] Handle overflow --- .../operators/OperatorBufferWithSize.java | 38 +++++++- .../operators/OperatorBufferTest.java | 92 ++++++++++++++++++- 2 files changed, 124 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java index cde0caaa1c..6f14f4bbf7 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorBufferWithSize.java @@ -19,7 +19,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import rx.Observable; import rx.Observable.Operator; @@ -73,9 +72,17 @@ public Subscriber call(final Subscriber> child) { @Override public void setProducer(final Producer producer) { child.setProducer(new Producer() { + + private volatile boolean infinite = false; + @Override public void request(long n) { - if (n == Long.MAX_VALUE) { + if (infinite) { + return; + } + if (n >= Long.MAX_VALUE / count) { + // n == Long.MAX_VALUE or n * count >= Long.MAX_VALUE + infinite = true; producer.request(Long.MAX_VALUE); } else { producer.request(n * count); @@ -127,14 +134,30 @@ public void onCompleted() { public void setProducer(final Producer producer) { child.setProducer(new Producer() { - private final AtomicBoolean firstRequest = new AtomicBoolean(false); + private volatile boolean firstRequest = true; + private volatile boolean infinite = false; + + private void requestInfinite() { + infinite = true; + producer.request(Long.MAX_VALUE); + } @Override public void request(long n) { + if (infinite) { + return; + } if (n == Long.MAX_VALUE) { - producer.request(Long.MAX_VALUE); + requestInfinite(); + return; } else { - if (firstRequest.compareAndSet(false, true)) { + if (firstRequest) { + firstRequest = false; + if (n - 1 >= (Long.MAX_VALUE - count) / skip) { + // count + skip * (n - 1) >= Long.MAX_VALUE + requestInfinite(); + return; + } // count = 5, skip = 2, n = 3 // * * * * * // * * * * * @@ -142,6 +165,11 @@ public void request(long n) { // request = 5 + 2 * ( 3 - 1) producer.request(count + skip * (n - 1)); } else { + if (n >= Long.MAX_VALUE / skip) { + // skip * n >= Long.MAX_VALUE + requestInfinite(); + return; + } // count = 5, skip = 2, n = 3 // (* * *) * * // ( *) * * * * diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java index 757af07fb5..cfa38d425e 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -869,7 +869,6 @@ public void request(long n) { assertEquals(6, requested.get()); } - @Test public void testProducerRequestThroughBufferWithSize4() { TestSubscriber> ts = new TestSubscriber>(); @@ -891,4 +890,95 @@ public void request(long n) { }).buffer(5, 2).subscribe(ts); assertEquals(Long.MAX_VALUE, requested.get()); } + + + @Test + public void testProducerRequestOverflowThroughBufferWithSize1() { + TestSubscriber> ts = new TestSubscriber>(); + ts.requestMore(Long.MAX_VALUE / 2); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(3, 3).subscribe(ts); + assertEquals(Long.MAX_VALUE, requested.get()); + } + + @Test + public void testProducerRequestOverflowThroughBufferWithSize2() { + TestSubscriber> ts = new TestSubscriber>(); + ts.requestMore(Long.MAX_VALUE / 2); + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + } + + }); + } + + }).buffer(3, 2).subscribe(ts); + assertEquals(Long.MAX_VALUE, requested.get()); + } + + @Test + public void testProducerRequestOverflowThroughBufferWithSize3() { + final AtomicLong requested = new AtomicLong(); + Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(final Subscriber s) { + s.setProducer(new Producer() { + + @Override + public void request(long n) { + requested.set(n); + s.onNext(1); + s.onNext(2); + s.onNext(3); + } + + }); + } + + }).buffer(3, 2).subscribe(new Subscriber>() { + + @Override + public void onStart() { + request(Long.MAX_VALUE / 2 - 4); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(List t) { + request(Long.MAX_VALUE / 2); + } + + }); + assertEquals(Long.MAX_VALUE, requested.get()); + } }