Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan backpressure and first emission fix #3171

Merged
merged 1 commit into from
Aug 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 271 additions & 69 deletions src/main/java/rx/internal/operators/OperatorScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Queue;

import rx.*;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func0;
import rx.functions.Func2;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.atomic.SpscLinkedAtomicQueue;
import rx.internal.util.unsafe.*;

/**
* Returns an Observable that applies a function to the first item emitted by a source Observable, then feeds
Expand Down Expand Up @@ -87,87 +86,290 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {

@Override
public Subscriber<? super T> call(final Subscriber<? super R> child) {
return new Subscriber<T>(child) {
private final R initialValue = initialValueFactory.call();
final R initialValue = initialValueFactory.call();

if (initialValue == NO_INITIAL_VALUE) {
return new Subscriber<T>(child) {
boolean once;
R value;
@SuppressWarnings("unchecked")
@Override
public void onNext(T t) {
R v;
if (!once) {
once = true;
v = (R)t;
} else {
v = value;
try {
v = accumulator.call(v, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
value = v;
child.onNext(v);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
}

final InitialProducer<R> ip = new InitialProducer<R>(initialValue, child);

Subscriber<T> parent = new Subscriber<T>() {
private R value = initialValue;
boolean initialized = false;

@SuppressWarnings("unchecked")
@Override
public void onNext(T currentValue) {
emitInitialValueIfNeeded(child);

if (this.value == NO_INITIAL_VALUE) {
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
this.value = (R) currentValue;
} else {
try {
this.value = accumulator.call(this.value, currentValue);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
child.onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
return;
}
R v = value;
try {
v = accumulator.call(v, currentValue);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
return;
}
child.onNext(this.value);
value = v;
ip.onNext(v);
}

@Override
public void onError(Throwable e) {
child.onError(e);
ip.onError(e);
}

@Override
public void onCompleted() {
emitInitialValueIfNeeded(child);
child.onCompleted();
}

private void emitInitialValueIfNeeded(final Subscriber<? super R> child) {
if (!initialized) {
initialized = true;
// we emit first time through if we have an initial value
if (initialValue != NO_INITIAL_VALUE) {
child.onNext(initialValue);
}
}
ip.onCompleted();
}

/**
* We want to adjust the requested value by subtracting 1 if we have an initial value
*/
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {

final AtomicBoolean once = new AtomicBoolean();

final AtomicBoolean excessive = new AtomicBoolean();

@Override
public void request(long n) {
if (once.compareAndSet(false, true)) {
if (initialValue == NO_INITIAL_VALUE || n == Long.MAX_VALUE) {
producer.request(n);
} else if (n == 1) {
excessive.set(true);
producer.request(1); // request at least 1
} else {
// n != Long.MAX_VALUE && n != 1
producer.request(n - 1);
}
} else {
// pass-thru after first time
if (n > 1 // avoid to request 0
&& excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) {
producer.request(n - 1);
} else {
producer.request(n);
}
ip.setProducer(producer);
}
};

child.add(parent);
child.setProducer(ip);
return parent;
}

static final class InitialProducer<R> implements Producer, Observer<R> {
final Subscriber<? super R> child;
final Queue<Object> queue;

boolean emitting;
/** Missed a terminal event. */
boolean missed;
/** Missed a request. */
long missedRequested;
/** Missed a producer. */
Producer missedProducer;
/** The current requested amount. */
long requested;
/** The current producer. */
Producer producer;

volatile boolean done;
Throwable error;

public InitialProducer(R initialValue, Subscriber<? super R> child) {
this.child = child;
Queue<Object> q;
// TODO switch to the linked-array based queue once available
if (UnsafeAccess.isUnsafeAvailable()) {
q = new SpscLinkedQueue<Object>(); // new SpscUnboundedArrayQueue<R>(8);
} else {
q = new SpscLinkedAtomicQueue<Object>(); // new SpscUnboundedAtomicArrayQueue<R>(8);
}
this.queue = q;
q.offer(initialValue);
}

@Override
public void request(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= required but it was " + n);
} else
if (n != 0L) {
synchronized (this) {
if (emitting) {
long mr = missedRequested;
long mu = mr + n;
if (mu < 0L) {
mu = Long.MAX_VALUE;
}
missedRequested = mu;
return;
}
});
emitting = true;
}

long r = requested;
long u = r + n;
if (u < 0L) {
u = Long.MAX_VALUE;
}
requested = u;

Producer p = producer;
if (p != null) {
p.request(n);
}

emitLoop();
}
};
}

@Override
public void onNext(R t) {
queue.offer(NotificationLite.instance().next(t));
emit();
}

boolean checkTerminated(boolean d, boolean empty, Subscriber<? super R> child) {
if (child.isUnsubscribed()) {
return true;
}
if (d) {
Throwable err = error;
if (err != null) {
child.onError(err);
return true;
} else
if (empty) {
child.onCompleted();
return true;
}
}
return false;
}

@Override
public void onError(Throwable e) {
error = e;
done = true;
emit();
}

@Override
public void onCompleted() {
done = true;
emit();
}

public void setProducer(Producer p) {
if (p == null) {
throw new NullPointerException();
}
synchronized (this) {
if (emitting) {
missedProducer = p;
return;
}
emitting = true;
}
producer = p;
long r = requested;
if (r != 0L) {
p.request(r);
}
emitLoop();
}

void emit() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
}
emitLoop();
}

void emitLoop() {
final Subscriber<? super R> child = this.child;
final Queue<Object> queue = this.queue;
final NotificationLite<R> nl = NotificationLite.instance();
long r = requested;
for (;;) {
boolean max = r == Long.MAX_VALUE;
boolean d = done;
boolean empty = queue.isEmpty();
if (checkTerminated(d, empty, child)) {
return;
}
while (r != 0L) {
d = done;
Object o = queue.poll();
empty = o == null;
if (checkTerminated(d, empty, child)) {
return;
}
if (empty) {
break;
}
R v = nl.getValue(o);
try {
child.onNext(v);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
child.onError(OnErrorThrowable.addValueAsLastCause(e, v));
return;
}
if (!max) {
r--;
}
}
if (!max) {
requested = r;
}

Producer p;
long mr;
synchronized (this) {
p = missedProducer;
mr = missedRequested;
if (!missed && p == null && mr == 0L) {
emitting = false;
return;
}
missed = false;
missedProducer = null;
missedRequested = 0L;
}

if (mr != 0L && !max) {
long u = r + mr;
if (u < 0L) {
u = Long.MAX_VALUE;
}
requested = u;
r = u;
}

if (p != null) {
producer = p;
if (r != 0L) {
p.request(r);
}
} else {
p = producer;
if (p != null && mr != 0L) {
p.request(mr);
}
}
}
}
}
}
Loading