Skip to content

Commit

Permalink
1.x: overhead reduction in range() and merge() operators
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Jan 13, 2016
1 parent ef1c509 commit 3b93232
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 56 deletions.
110 changes: 56 additions & 54 deletions src/main/java/rx/internal/operators/OnSubscribeRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,108 +25,110 @@
*/
public final class OnSubscribeRange implements OnSubscribe<Integer> {

private final int start;
private final int end;
private final int startIndex;
private final int endIndex;

public OnSubscribeRange(int start, int end) {
this.start = start;
this.end = end;
this.startIndex = start;
this.endIndex = end;
}

@Override
public void call(final Subscriber<? super Integer> o) {
o.setProducer(new RangeProducer(o, start, end));
public void call(final Subscriber<? super Integer> childSubscriber) {
childSubscriber.setProducer(new RangeProducer(childSubscriber, startIndex, endIndex));
}

private static final class RangeProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = 4114392207069098388L;

private final Subscriber<? super Integer> o;
private final int end;
private long index;
private final Subscriber<? super Integer> childSubscriber;
private final int endOfRange;
private long currentIndex;

RangeProducer(Subscriber<? super Integer> o, int start, int end) {
this.o = o;
this.index = start;
this.end = end;
RangeProducer(Subscriber<? super Integer> childSubscriber, int startIndex, int endIndex) {
this.childSubscriber = childSubscriber;
this.currentIndex = startIndex;
this.endOfRange = endIndex;
}

@Override
public void request(long n) {
public void request(long requestedAmount) {
if (get() == Long.MAX_VALUE) {
// already started with fast-path
return;
}
if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
if (requestedAmount == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) {
// fast-path without backpressure
fastpath();
} else if (n > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, n);
} else if (requestedAmount > 0L) {
long c = BackpressureUtils.getAndAddRequest(this, requestedAmount);
if (c == 0L) {
// backpressure is requested
slowpath(n);
slowpath(requestedAmount);
}
}
}

/**
*
* Emits as many values as requested or remaining from the range, whichever is smaller.
*/
void slowpath(long r) {
long idx = index;
while (true) {
/*
* This complicated logic is done to avoid touching the volatile `index` and `requested` values
* during the loop itself. If they are touched during the loop the performance is impacted significantly.
*/
long fs = end - idx + 1;
long e = Math.min(fs, r);
final boolean complete = fs <= r;

fs = e + idx;
final Subscriber<? super Integer> o = this.o;
void slowpath(long requestedAmount) {
long emitted = 0L;
long endIndex = endOfRange + 1L;
long index = currentIndex;

final Subscriber<? super Integer> childSubscriber = this.childSubscriber;

for (;;) {

for (long i = idx; i != fs; i++) {
if (o.isUnsubscribed()) {
while (emitted != requestedAmount && index != endIndex) {
if (childSubscriber.isUnsubscribed()) {
return;
}
o.onNext((int) i);

childSubscriber.onNext((int)index);

index++;
emitted++;
}

if (complete) {
if (o.isUnsubscribed()) {
return;
}
o.onCompleted();
if (childSubscriber.isUnsubscribed()) {
return;
}

idx = fs;
index = fs;

r = addAndGet(-e);
if (r == 0L) {
// we're done emitting the number requested so return
if (index == endIndex) {
childSubscriber.onCompleted();
return;
}

requestedAmount = get();

if (requestedAmount == emitted) {
currentIndex = index;
requestedAmount = addAndGet(-emitted);
if (requestedAmount == 0L) {
break;
}
emitted = 0L;
}
}
}

/**
*
* Emits all remaining values without decrementing the requested amount.
*/
void fastpath() {
final long end = this.end + 1L;
final Subscriber<? super Integer> o = this.o;
for (long i = index; i != end; i++) {
if (o.isUnsubscribed()) {
final long endIndex = this.endOfRange + 1L;
final Subscriber<? super Integer> childSubscriber = this.childSubscriber;
for (long index = currentIndex; index != endIndex; index++) {
if (childSubscriber.isUnsubscribed()) {
return;
}
o.onNext((int) i);
childSubscriber.onNext((int) index);
}
if (!o.isUnsubscribed()) {
o.onCompleted();
if (!childSubscriber.isUnsubscribed()) {
childSubscriber.onCompleted();
}
}
}
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,24 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
/** An empty array to avoid creating new empty arrays in removeInner. */
static final InnerSubscriber<?>[] EMPTY = new InnerSubscriber<?>[0];

final int scalarEmissionLimit;

int scalarEmissionCount;

public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int maxConcurrent) {
this.child = child;
this.delayErrors = delayErrors;
this.maxConcurrent = maxConcurrent;
this.nl = NotificationLite.instance();
this.innerGuard = new Object();
this.innerSubscribers = EMPTY;
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
if (maxConcurrent == Integer.MAX_VALUE) {
scalarEmissionLimit = Integer.MAX_VALUE;
request(Long.MAX_VALUE);
} else {
scalarEmissionLimit = Math.max(1, maxConcurrent >> 1);
request(maxConcurrent);
}
}

Queue<Throwable> getOrCreateErrorQueue() {
Expand Down Expand Up @@ -488,7 +498,15 @@ protected void emitScalar(T value, long r) {
if (r != Long.MAX_VALUE) {
producer.produced(1);
}
this.requestMore(1);

int produced = scalarEmissionCount + 1;
if (produced == scalarEmissionLimit) {
scalarEmissionCount = 0;
this.requestMore(produced);
} else {
scalarEmissionCount = produced;
}

// check if some state changed while emitting
synchronized (this) {
skipFinal = true;
Expand Down

0 comments on commit 3b93232

Please sign in to comment.