Skip to content

Commit

Permalink
1.x: overhead reduction for merge and flatMap
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 10, 2015
1 parent 9bc1987 commit a8ce5e3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
20 changes: 13 additions & 7 deletions src/main/java/rx/internal/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,16 @@ public void request(long n) {
*/
void slowpath(long r) {
long idx = index;
long e = 0L;
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long fs = end - idx + 1;
long e = Math.min(fs, r);
final boolean complete = fs <= r;

fs = e + idx;
fs = Math.min(fs, r) + idx;
final Subscriber<? super Integer> o = this.o;

for (long i = idx; i != fs; i++) {
Expand All @@ -102,13 +102,19 @@ void slowpath(long r) {
return;
}

e -= fs - idx;

idx = fs;
index = fs;

r = addAndGet(-e);
r = get() + e;
if (r == 0L) {
// we're done emitting the number requested so return
return;
index = fs;
r = addAndGet(e);
if (r == 0L) {
// we're done emitting the number requested so return
return;
}
e = 0L;
}
}
}
Expand All @@ -131,4 +137,4 @@ void fastpath() {
}
}

}
}
21 changes: 19 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,28 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
/** An empty array to avoid creating new empty arrays in removeInner. */
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];

int scalarEmission;

final int scalarLimit;

public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
this.child = child;
this.delayErrors = delayErrors;
this.maxConcurrent = maxConcurrent;
this.nl = NotificationLite.instance();
this.innerGuard = new Object();
this.innerSubscribers = EMPTY;
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
long r;
int lim;
if (maxConcurrent == Integer.MAX_VALUE) {
r = Long.MAX_VALUE;
lim = Integer.MAX_VALUE;
} else {
r = maxConcurrent;
lim = Math.max(maxConcurrent >> 1, 1);
}
this.scalarLimit = lim;
request(r);
}

Queue<Throwable> getOrCreateErrorQueue() {
Expand Down Expand Up @@ -488,7 +502,10 @@ protected void emitScalar(T value, long r) {
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
this.requestMore(1);
if (++scalarEmission == scalarLimit) {
scalarEmission = 0;
this.requestMore(scalarLimit);
}
// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
Expand Down

0 comments on commit a8ce5e3

Please sign in to comment.