Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip with Backpressure Support #1446

Merged
merged 3 commits into from
Jul 17, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 153 additions & 89 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package rx.internal.operators;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Observer;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func2;
import rx.functions.Func3;
Expand All @@ -33,6 +35,7 @@
import rx.functions.Func9;
import rx.functions.FuncN;
import rx.functions.Functions;
import rx.internal.util.RxRingBuffer;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -48,7 +51,9 @@
* <p>
* The resulting Observable returned from zip will invoke <code>onNext</code> as many times as the
* number of <code>onNext</code> invocations of the source Observable that emits the fewest items.
* @param <R> the result type
*
* @param <R>
* the result type
*/
public final class OperatorZip<R> implements Operator<R, Observable<?>[]> {
/*
Expand Down Expand Up @@ -104,69 +109,106 @@ public OperatorZip(Func9 f) {

@SuppressWarnings("rawtypes")
@Override
public Subscriber<? super Observable[]> call(final Subscriber<? super R> observer) {
return new Subscriber<Observable[]>(observer) {
public Subscriber<? super Observable[]> call(final Subscriber<? super R> child) {
final Zip<R> zipper = new Zip<R>(child, zipFunction);
final ZipProducer<R> producer = new ZipProducer<R>(zipper);
child.setProducer(producer);
final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer);
return subscriber;
}

boolean started = false;
private final class ZipSubscriber extends Subscriber<Observable[]> {

@Override
public void onCompleted() {
if (!started) {
// this means we have not received a valid onNext before termination so we emit the onCompleted
observer.onCompleted();
}
}
final Subscriber<? super R> child;
final Zip<R> zipper;
final ZipProducer<R> producer;

@Override
public void onError(Throwable e) {
observer.onError(e);
public ZipSubscriber(Subscriber<? super R> child, Zip<R> zipper, ZipProducer<R> producer) {
super(child);
this.child = child;
this.zipper = zipper;
this.producer = producer;
}

boolean started = false;

@Override
public void onCompleted() {
if (!started) {
// this means we have not received a valid onNext before termination so we emit the onCompleted
child.onCompleted();
}
}

@Override
public void onNext(Observable[] observables) {
if (observables == null || observables.length == 0) {
observer.onCompleted();
} else {
started = true;
new Zip<R>(observables, observer, zipFunction).zip();
}
@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(Observable[] observables) {
if (observables == null || observables.length == 0) {
child.onCompleted();
} else {
started = true;
zipper.start(observables, producer);
}
}

}

private static final class ZipProducer<R> extends AtomicLong implements Producer {

private Zip<R> zipper;

public ZipProducer(Zip<R> zipper) {
this.zipper = zipper;
}

@Override
public void request(long n) {
addAndGet(n);
// try and claim emission if no other threads are doing so
zipper.tick();
}

};
}

static final NotificationLite<Object> on = NotificationLite.instance();
private static final class Zip<R> {
@SuppressWarnings("rawtypes")
final Observable[] os;
final Object[] observers;
final Observer<? super R> observer;
final FuncN<? extends R> zipFunction;
final CompositeSubscription childSubscription = new CompositeSubscription();
private final Observer<? super R> child;
private final FuncN<? extends R> zipFunction;
private final CompositeSubscription childSubscription = new CompositeSubscription();

volatile long counter;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER
= AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");

static final int THRESHOLD = (int) (RxRingBuffer.SIZE * 0.7);
int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block

/* initialized when started in `start` */
private Object[] observers;
private AtomicLong requested;

@SuppressWarnings("rawtypes")
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
this.os = os;
this.observer = observer;
public Zip(final Subscriber<? super R> child, FuncN<? extends R> zipFunction) {
this.child = child;
this.zipFunction = zipFunction;
child.add(childSubscription);
}

@SuppressWarnings("unchecked")
public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) {
observers = new Object[os.length];
this.requested = requested;
for (int i = 0; i < os.length; i++) {
InnerObserver io = new InnerObserver();
InnerSubscriber io = new InnerSubscriber();
observers[i] = io;
childSubscription.add(io);
}

observer.add(childSubscription);
}

@SuppressWarnings("unchecked")
public void zip() {
for (int i = 0; i < os.length; i++) {
os[i].unsafeSubscribe((InnerObserver) observers[i]);
os[i].unsafeSubscribe((InnerSubscriber) observers[i]);
}
}

Expand All @@ -179,51 +221,64 @@ public void zip() {
*/
@SuppressWarnings("unchecked")
void tick() {
if (observers == null) {
// nothing yet to do (initial request from Producer)
return;
}
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
do {
final Object[] vs = new Object[observers.length];
boolean allHaveValues = true;
for (int i = 0; i < observers.length; i++) {
Object n = ((InnerObserver) observers[i]).items.peek();

if (n == null) {
allHaveValues = false;
continue;
}
// we only emit if requested > 0
if (requested.get() > 0) {
final Object[] vs = new Object[observers.length];
boolean allHaveValues = true;
for (int i = 0; i < observers.length; i++) {
RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items;
Object n = buffer.peek();

if (n == null) {
allHaveValues = false;
continue;
}

switch (on.kind(n)) {
case OnNext:
vs[i] = on.getValue(n);
break;
case OnCompleted:
observer.onCompleted();
// we need to unsubscribe from all children since children are
// independently subscribed
childSubscription.unsubscribe();
return;
default:
// shouldn't get here
}
}
if (allHaveValues) {
try {
// all have something so emit
observer.onNext(zipFunction.call(vs));
} catch (Throwable e) {
observer.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
return;
}
// now remove them
for (Object obj : observers) {
InnerObserver io = (InnerObserver)obj;
io.items.poll();
// eagerly check if the next item on this queue is an onComplete
if (on.isCompleted(io.items.peek())) {
// it is an onComplete so shut down
observer.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
if (buffer.isCompleted(n)) {
child.onCompleted();
// we need to unsubscribe from all children since children are
// independently subscribed
childSubscription.unsubscribe();
return;
} else {
vs[i] = buffer.getValue(n);
}
}
if (allHaveValues) {
try {
// all have something so emit
child.onNext(zipFunction.call(vs));
// we emitted so decrement the requested counter
requested.decrementAndGet();
emitted++;
} catch (Throwable e) {
child.onError(OnErrorThrowable.addValueAsLastCause(e, vs));
return;
}
// now remove them
for (Object obj : observers) {
RxRingBuffer buffer = ((InnerSubscriber) obj).items;
buffer.poll();
// eagerly check if the next item on this queue is an onComplete
if (buffer.isCompleted(buffer.peek())) {
// it is an onComplete so shut down
child.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
childSubscription.unsubscribe();
return;
}
}
if (emitted > THRESHOLD) {
for (Object obj : observers) {
((InnerSubscriber) obj).request(emitted);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we requesting more data from sources if the downstream didn't ask for values? It looks like as if request(n) is used as a repeated 'batching' operation. I guess this was required for backpressure unaware downstream which would "hang" unless new data is requested even now and then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far I have treated upstream and downstream as decoupled on async operators like this with a buffer in them. Even if no further is requested from downstream it tries to keep the buffers full. This is similar to merge and observeOn. This is done because it can't know when a request will come in but assumes it will and keeps filling the buffers so the data is available when the downstream requests it.

I'm open to exploring alternatives if performance testing shows a different approach is better for throughput.

You'll also notice that it does not perform the request until after it passes the THRESHOLD value. This is done so it requests in batches rather than one at a time, which can be more efficient in certain use cases.

What are you recommending doing differently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of introducing buffers, I'd keep the current unbounded operator behavior on the onNext stream for most operators and only decorate the request(n) stream going backwards to the source; basically what I did in my zip operator. Otherwise, I'd use a batch(n) operator I suggested.

But currently, I only considered from and range as the sources where this simplified request mangling is straightforward. Are there any other RxJava sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having unbounded buffers defeats the point of backpressure and makes it non-deterministic as to whether buffer float will occur from perspective of a user. The idea is to eliminate unbounded buffers unless the user asked for it such as in replay, toList or onBackpressureBuffer.

}
emitted = 0;
}
}
}
Expand All @@ -235,27 +290,36 @@ void tick() {
// used to observe each Observable we are zipping together
// it collects all items in an internal queue
@SuppressWarnings("rawtypes")
final class InnerObserver extends Subscriber {
final class InnerSubscriber extends Subscriber {
// Concurrent* since we need to read it from across threads
final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue();
final RxRingBuffer items = RxRingBuffer.getSpmcInstance();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is at most 1 consumer and 1 producer thread, why the SPMC queue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oversight on my part, this should work as SPSC. I'll make the change.


@Override
public void onStart() {
request(RxRingBuffer.SIZE);
}

@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
items.add(on.completed());
items.onCompleted();
tick();
}

@Override
public void onError(Throwable e) {
// emit error and shut down
observer.onError(e);
// emit error immediately and shut down
child.onError(e);
}

@SuppressWarnings("unchecked")
@Override
public void onNext(Object t) {
items.add(on.next(t));
try {
items.onNext(t);
} catch (MissingBackpressureException e) {
onError(e);
}
tick();
}
};
Expand Down
26 changes: 20 additions & 6 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,27 +276,37 @@ public Object poll() {
Object o;
o = queue.poll();
/*
* benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll()
* benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll()
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
* "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState.
* "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState.
*
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
* or needing to enqueue terminalState.
*
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
* is currently the way it is.
*
* This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list,
* such as ConcurrentLinkedQueue.
*/
if (o == null && terminalState != null && queue.size() == 0) {
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
// once emitted we clear so a poll loop will finish
terminalState = null;
}
return o;
}

public Object peek() {
if (queue == null) {
// we are unsubscribed and have released the undelrying queue
return null;
}
Object o;
o = queue.peek();
if (o == null && terminalState != null && queue.isEmpty()) {
o = terminalState;
}
return o;
}

public boolean isCompleted(Object o) {
return on.isCompleted(o);
Expand All @@ -306,6 +316,10 @@ public boolean isError(Object o) {
return on.isError(o);
}

public Object getValue(Object o) {
return on.getValue(o);
}

public boolean accept(Object o, Observer child) {
return on.accept(child, o);
}
Expand Down
Loading