diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index b65fd94f5b..1d70a39bb1 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -8896,7 +8896,8 @@ public final Observable> window(Func0 *
*
Backpressure Support:
- *
This operator does not support backpressure as it uses {@code count} to control data flow.
+ *
The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables + * but each of them will emit at most {@code count} elements.
*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
*
@@ -8920,7 +8921,8 @@ public final Observable> window(int count) { * *
*
Backpressure Support:
- *
This operator does not support backpressure as it uses {@code count} to control data flow.
+ *
The operator has limited backpressure support. If {@code count} == {@code skip}, the operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables + * but each of them will emit at most {@code count} elements.
*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
*
diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index 83d62fb995..ed22a68bd6 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -15,18 +15,14 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; -import rx.Observable; +import rx.*; import rx.Observable.Operator; -import rx.Subscription; +import rx.Observable; +import rx.Observer; import rx.functions.Action0; import rx.subscriptions.Subscriptions; -import rx.Observer; -import rx.Subscriber; /** * Creates windows of values into the source sequence with skip frequency and size bounds. @@ -78,26 +74,36 @@ public ExactSubscriber(Subscriber> child) { @Override public void call() { // if no window we unsubscribe up otherwise wait until window ends - if(noWindow) { + if (noWindow) { parentSubscription.unsubscribe(); } } })); - } - - @Override - public void onStart() { - // no backpressure as we are controlling data flow by window size - request(Long.MAX_VALUE); + child.setProducer(new Producer() { + @Override + public void request(long n) { + if (n > 0) { + long u = n * size; + if (((u >>> 31) != 0) && (u / n != size)) { + u = Long.MAX_VALUE; + } + requestMore(u); + } + } + }); } + void requestMore(long n) { + request(n); + } + @Override public void onNext(T t) { if (window == null) { noWindow = false; window = BufferUntilSubscriber.create(); - child.onNext(window); + child.onNext(window); } window.onNext(t); if (++count % size == 0) { diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java index f23da00145..ed8333e5ec 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java @@ -15,20 +15,19 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; +import static org.mockito.Mockito.*; +import rx.*; import rx.Observable; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.Observer; +import rx.functions.*; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -198,5 +197,52 @@ private List list(String... args) { } return list; } + + @Test + public void testBackpressureOuter() { + Observable> source = Observable.range(1, 10).window(3); + + final List list = new ArrayList(); + + @SuppressWarnings("unchecked") + final Observer o = mock(Observer.class); + + source.subscribe(new Subscriber>() { + @Override + public void onStart() { + request(1); + } + @Override + public void onNext(Observable t) { + t.subscribe(new Observer() { + @Override + public void onNext(Integer t) { + list.add(t); + } + @Override + public void onError(Throwable e) { + o.onError(e); + } + @Override + public void onCompleted() { + o.onCompleted(); + } + }); + } + @Override + public void onError(Throwable e) { + o.onError(e); + } + @Override + public void onCompleted() { + o.onCompleted(); + } + }); + + assertEquals(Arrays.asList(1, 2, 3), list); + + verify(o, never()).onError(any(Throwable.class)); + verify(o, times(1)).onCompleted(); // 1 inner + } } \ No newline at end of file