Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add onBackpressureBuffer with capacity #1928

Merged
merged 1 commit into from
Dec 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.*;
import java.util.concurrent.*;

import rx.annotations.Beta;
import rx.annotations.Experimental;
import rx.exceptions.*;
import rx.functions.*;
Expand Down Expand Up @@ -5035,6 +5036,48 @@ public final Observable<T> onBackpressureBuffer() {
return lift(new OperatorOnBackpressureBuffer<T>());
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
* undelivered items, and unsubscribing from the source.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity.
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @Beta
*/
@Beta
public final Observable<T> onBackpressureBuffer(long capacity) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
* undelivered items, unsubscribing from the source, and notifying the producer with {@code onOverflow}.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity.
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
* @Beta
*/
@Beta
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,44 @@

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;

public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {

private final NotificationLite<T> on = NotificationLite.instance();

private final Long capacity;
private final Action0 onOverflow;

public OperatorOnBackpressureBuffer() {
this.capacity = null;
this.onOverflow = null;
}

public OperatorOnBackpressureBuffer(long capacity) {
this(capacity, null);
}

public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
if (capacity <= 0) {
throw new IllegalArgumentException("Buffer capacity must be > 0");
}
this.capacity = capacity;
this.onOverflow = onOverflow;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// TODO get a different queue implementation
// TODO start with size hint
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
final AtomicLong wip = new AtomicLong();
final AtomicLong requested = new AtomicLong();

Expand All @@ -40,37 +63,71 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
@Override
public void request(long n) {
if (requested.getAndAdd(n) == 0) {
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}
}

});
// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
Subscriber<T> parent = new Subscriber<T>() {

private AtomicBoolean saturated = new AtomicBoolean(false);

@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
queue.offer(on.completed());
pollQueue(wip, requested, queue, child);
if (!saturated.get()) {
queue.offer(on.completed());
pollQueue(wip, requested, capacity, queue, child);
}
}

@Override
public void onError(Throwable e) {
queue.offer(on.error(e));
pollQueue(wip, requested, queue, child);
if (!saturated.get()) {
queue.offer(on.error(e));
pollQueue(wip, requested, capacity, queue, child);
}
}

@Override
public void onNext(T t) {
if (!assertCapacity()) {
return;
}
queue.offer(on.next(t));
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}

private boolean assertCapacity() {
if (capacity == null) {
return true;
}

long currCapacity;
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
unsubscribe();
child.onError(new MissingBackpressureException(
"Overflowed buffer of "
+ OperatorOnBackpressureBuffer.this.capacity));
if (onOverflow != null) {
onOverflow.call();
}
}
return false;
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
return true;
}
};

// if child unsubscribes it should unsubscribe the parent, but not the other way around
Expand All @@ -79,7 +136,7 @@ public void onNext(T t) {
return parent;
}

private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue, Subscriber<? super T> child) {
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
if (requested.get() > 0) {
// only one draining at a time
Expand All @@ -96,6 +153,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
requested.incrementAndGet();
return;
}
if (capacity != null) { // it's bounded
capacity.incrementAndGet();
}
on.accept(child, o);
} else {
// we hit the end ... so increment back to 0 again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,24 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class OperatorOnBackpressureBufferTest {

@Test
Expand Down Expand Up @@ -81,6 +86,56 @@ public void onNext(Long t) {
assertEquals(499, ts.getOnNextEvents().get(499).intValue());
}

@Test(expected = IllegalArgumentException.class)
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
Observable.empty().onBackpressureBuffer(-1);
}

@Test(expected = IllegalArgumentException.class)
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
Observable.empty().onBackpressureBuffer(-1);
}

@Test
public void testFixBackpressureBoundedBuffer() throws InterruptedException {
final CountDownLatch l1 = new CountDownLatch(100);
final CountDownLatch backpressureCallback = new CountDownLatch(1);
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {

@Override
public void onCompleted() { }

@Override
public void onError(Throwable e) { }

@Override
public void onNext(Long t) {
l1.countDown();
}

});

ts.requestMore(100);
Subscription s = infinite.subscribeOn(Schedulers.computation())
.onBackpressureBuffer(500, new Action0() {
@Override
public void call() {
backpressureCallback.countDown();
}
}).take(1000).subscribe(ts);
l1.await();

ts.requestMore(50);

assertTrue(backpressureCallback.await(500, TimeUnit.MILLISECONDS));
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);

int size = ts.getOnNextEvents().size();
assertTrue(size <= 150); // will get up to 50 more
assertTrue(ts.getOnNextEvents().get(size-1) == size-1);
assertTrue(s.isUnsubscribed());
}

static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {

@Override
Expand All @@ -92,4 +147,5 @@ public void call(Subscriber<? super Long> s) {
}

});

}