Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: overhead reduction for merge and flatMap #3476

Merged
merged 1 commit into from
Feb 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 56 additions & 54 deletions src/main/java/rx/internal/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,108 +25,110 @@
*/
public final class OnSubscribeRange implements OnSubscribe<Integer> {

private final int start;
private final int end;
private final int startIndex;
private final int endIndex;

public OnSubscribeRange(int start, int end) {
this.start = start;
this.end = end;
this.startIndex = start;
this.endIndex = end;
}

@Override
public void call(final Subscriber<? super Integer> o) {
o.setProducer(new RangeProducer(o, start, end));
public void call(final Subscriber<? super Integer> childSubscriber) {
childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex));
}

private static final class RangeProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = 4114392207069098388L;

private final Subscriber<? super Integer> o;
private final int end;
private long index;
private final Subscriber<? super Integer> childSubscriber;
private final int endOfRange;
private long currentIndex;

RangeProducer(Subscriber<? super Integer> o, int start, int end) {
this.o = o;
this.index = start;
this.end = end;
RangeProducer(Subscriber<? super Integer> childSubscriber, int startIndex, int endIndex) {
this.childSubscriber = childSubscriber;
this.currentIndex = startIndex;
this.endOfRange = endIndex;
}

@Override
public void request(long n) {
public void request(long requestedAmount) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
// fast-path without backpressure
fastpath();
} else if (n > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, n);
} else if (requestedAmount > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
if (c == 0L) {
// backpressure is requested
slowpath(n);
slowpath(requestedAmount);
}
}
}

/**
*
* Emits as many values as requested or remaining from the range, whichever is smaller.
*/
void slowpath(long r) {
long idx = index;
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long fs = end - idx + 1;
long e = Math.min(fs, r);
final boolean complete = fs <= r;

fs = e + idx;
final Subscriber<? super Integer> o = this.o;
void slowpath(long requestedAmount) {
long emitted = 0L;
long endIndex = endOfRange + 1L;
long index = currentIndex;

final Subscriber<? super Integer> childSubscriber = this.childSubscriber;

for (;;) {

for (long i = idx; i != fs; i++) {
if (o.isUnsubscribed()) {
while (emitted != requestedAmount && index != endIndex) {
if (childSubscriber.isUnsubscribed()) {
return;
}
o.onNext((int) i);

childSubscriber.onNext((int)index);

index++;
emitted++;
}

if (complete) {
if (o.isUnsubscribed()) {
return;
}
o.onCompleted();
if (childSubscriber.isUnsubscribed()) {
return;
}

idx = fs;
index = fs;

r = addAndGet(-e);
if (r == 0L) {
// we're done emitting the number requested so return
if (index == endIndex) {
childSubscriber.onCompleted();
return;
}

requestedAmount = get();

if (requestedAmount == emitted) {
currentIndex = index;
requestedAmount = addAndGet(-emitted);
if (requestedAmount == 0L) {
break;
}
emitted = 0L;
}
}
}

/**
*
* Emits all remaining values without decrementing the requested amount.
*/
void fastpath() {
final long end = this.end + 1L;
final Subscriber<? super Integer> o = this.o;
for (long i = index; i != end; i++) {
if (o.isUnsubscribed()) {
final long endIndex = this.endOfRange + 1L;
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
for (long index = currentIndex; index != endIndex; index++) {
if (childSubscriber.isUnsubscribed()) {
return;
}
o.onNext((int) i);
childSubscriber.onNext((int) index);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
if (!childSubscriber.isUnsubscribed()) {
childSubscriber.onCompleted();
}
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,24 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
/** An empty array to avoid creating new empty arrays in removeInner. */
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];

final int scalarEmissionLimit;

int scalarEmissionCount;

public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
this.child = child;
this.delayErrors = delayErrors;
this.maxConcurrent = maxConcurrent;
this.nl = NotificationLite.instance();
this.innerGuard = new Object();
this.innerSubscribers = EMPTY;
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
if (maxConcurrent == Integer.MAX_VALUE) {
scalarEmissionLimit = Integer.MAX_VALUE;
request(Long.MAX_VALUE);
} else {
scalarEmissionLimit = Math.max(1, maxConcurrent >> 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small question: any reason for setting scalarEmissionLimit to maxConcurrent / 2? Or just pick up a magic number 2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to use 50% - 75% of the prefetch as the rerequest amount. It's a heuristic and its optimal value depends on the source emission pattern. The idea is to amortize a 40 cycle atomic increment by doing it less frequently. Basically any value above 1 helps with the overhead reduction but too large and any source to consumer loses pipelining effects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying 👍

request(maxConcurrent);
}
}

Queue<Throwable> getOrCreateErrorQueue() {
Expand Down Expand Up @@ -488,7 +498,15 @@ protected void emitScalar(T value, long r) {
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
this.requestMore(1);

int produced = scalarEmissionCount + 1;
if (produced == scalarEmissionLimit) {
scalarEmissionCount = 0;
this.requestMore(produced);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use the const scalarEmissionLimit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That equals the producer which is already a local/register loaded value

} else {
scalarEmissionCount = produced;
}

// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
Expand Down