Skip to content

Commit

Permalink
Merge pull request #1489 from benjchristensen/terminal-operator-backp…
Browse files Browse the repository at this point in the history
…ressure

Backpressure Fixes and Docs
  • Loading branch information
benjchristensen committed Jul 23, 2014
2 parents 077d70a + cc8bf6d commit bb56093
Show file tree
Hide file tree
Showing 21 changed files with 524 additions and 3 deletions.
396 changes: 395 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {

@Override
public void request(long n) {
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE) {
REQUESTED_UPDATER.set(this, n);
// fast-path without backpressure
while (it.hasNext()) {
if (o.isUnsubscribed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ private RangeProducer(Subscriber<? super Integer> o, int start, int end) {

@Override
public void request(long n) {
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE) {
REQUESTED_UPDATER.set(this, n);
// fast-path without backpressure
for (long i = index; i <= end; i++) {
if (o.isUnsubscribed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {
final DebounceState<T> state = new DebounceState<T>();
final Subscriber<?> self = this;

@Override
public void onStart() {
// debounce wants to receive everything as a firehose without backpressure
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
Observable<U> debouncer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public void onNext(T t) {
U key = keySelector.call(t);
if (keyMemory.add(key)) {
child.onNext(t);
} else {
request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void onNext(T t) {
if (hasPrevious) {
if (!(currentKey == key || (key != null && key.equals(currentKey)))) {
child.onNext(t);
} else {
request(1);
}
} else {
hasPrevious = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public void onNext(T value) {
if (currentIndex == index) {
subscriber.onNext(value);
subscriber.onCompleted();
} else {
request(1);
}
currentIndex++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ public GroupBySubscriber(Func1<? super T, ? extends K> keySelector, Subscriber<?
static final AtomicIntegerFieldUpdater<GroupBySubscriber> TERMINATED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "terminated");

@Override
public void onStart() {
/*
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable" and
* blocking any one group would block the entire parent stream. If backpressure is needed on individual groups then
* operators such as `onBackpressureDrop` or `onBackpressureBuffer` should be used.
*/
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ public Subscriber<? super T> call(Subscriber<? super GroupedObservable<K, R>> ch
/** Guarded by guard. */
Map<K, GroupSubject<K, R>> groups = new HashMap<K, GroupSubject<K, R>>();

@Override
public void onStart() {
/*
* This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable" and
* blocking any one group would block the entire parent stream. If backpressure is needed on individual groups then
* operators such as `onBackpressureDrop` or `onBackpressureBuffer` should be used.
*/
request(Long.MAX_VALUE);
}

final Subscriber<T> self = this;
@Override
public void onNext(T t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ static final class SamplerSubscriber<T> extends Subscriber<T> implements Action0
public SamplerSubscriber(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
value = t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public void onNext(T value) {
}
if (deque.size() == count) {
subscriber.onNext(on.getValue(deque.removeFirst()));
} else {
request(1);
}
deque.offerLast(on.next(value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public void onCompleted() {
public void onNext(T t) {
if (gate.get()) {
s.onNext(t);
} else {
request(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void onNext(T t) {
if (!predicate.call(t, index++)) {
skipping = false;
child.onNext(t);
} else {
request(1);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ protected void runEvictionPolicy(long now) {
}
}

// no backpressure up as it wants to receive and discard all but the last
@Override
public void onStart() {
// we do this to break the chain of the child subscriber being passed through
request(Long.MAX_VALUE);
}

@Override
public void onNext(T args) {
long t = scheduler.now();
Expand All @@ -89,6 +96,7 @@ public void onError(Throwable e) {
public void onCompleted() {
runEvictionPolicy(scheduler.now());
try {
// TODO this can be made to support backpressure
for (Timestamped<T> v : buffer) {
subscriber.onNext(v.getValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {

private long lastOnNext = 0;

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T v) {
long now = scheduler.now();
if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
lastOnNext = now;
subscriber.onNext(v);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber

private Map<K, V> map = mapFactory.call();

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T v) {
K key = keySelector.call(v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public Subscriber<? super T> call(final Subscriber<? super Map<K, Collection<V>>
return new Subscriber<T>(subscriber) {
private Map<K, Collection<V>> map = mapFactory.call();

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T v) {
K key = keySelector.call(v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public SourceSubscriber(Subscriber<? super Observable<T>> child) {
this.child = new SerializedSubscriber<Observable<T>>(child);
this.guard = new Object();
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
List<Object> localQueue;
Expand Down Expand Up @@ -285,6 +291,12 @@ public BoundarySubscriber(Subscriber<?> child, SourceSubscriber<T> sub) {
super(child);
this.sub = sub;
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(U t) {
sub.replaceWindow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {

Subscriber<U> open = new Subscriber<U>(child) {

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(U t) {
sub.beginWindow(t);
Expand Down Expand Up @@ -100,6 +105,12 @@ public SourceSubscriber(Subscriber<? super Observable<T>> child) {
this.csub = new CompositeSubscription();
child.add(csub);
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
List<SerializedSubject<T>> list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child, Worker worker) {
this.guard = new Object();
this.state = State.empty();
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
List<Object> localQueue;
Expand Down Expand Up @@ -351,6 +357,11 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child, Worker worker)
this.chunks = new LinkedList<CountedSerializedSubject<T>>();
}

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onNext(T t) {
List<CountedSerializedSubject<T>> list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand All @@ -29,12 +30,15 @@

import rx.Observable;
import rx.Observer;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class OperatorSkipLastTest {

@Test
public void testSkipLastEmpty() {
Observable<String> observable = Observable.<String>empty().skipLast(2);
Observable<String> observable = Observable.<String> empty().skipLast(2);

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -99,6 +103,17 @@ public void testSkipLastWithNull() {
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipLastWithBackpressure() {
Observable<Integer> o = Observable.range(0, RxRingBuffer.SIZE * 2).skipLast(RxRingBuffer.SIZE + 10);
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
o.observeOn(Schedulers.computation()).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((RxRingBuffer.SIZE) - 10, ts.getOnNextEvents().size());

}

@Test(expected = IndexOutOfBoundsException.class)
public void testSkipLastWithNegativeCount() {
Observable.from("one").skipLast(-1);
Expand Down

0 comments on commit bb56093

Please sign in to comment.