-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flatMap: fixed scalar-merging. #2878
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,8 +131,8 @@ private static final class MergeSubscriber<T> extends Subscriber<Observable<? ex | |
private volatile RxRingBuffer scalarValueQueue = null; | ||
|
||
/* protected by lock on MergeSubscriber instance */ | ||
private int missedEmitting = 0; | ||
private boolean emitLock = false; | ||
private boolean missedEmitting; | ||
private boolean emitLock; | ||
|
||
/** | ||
* Using synchronized(this) for `emitLock` instead of ReentrantLock or AtomicInteger is faster when there is no contention. | ||
|
@@ -246,63 +246,58 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext | |
|
||
private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) { | ||
T value = t.get(); | ||
if (getEmitLock()) { | ||
boolean moreToDrain; | ||
try { | ||
actual.onNext(value); | ||
} finally { | ||
moreToDrain = releaseEmitLock(); | ||
} | ||
if (moreToDrain) { | ||
drainQueuesIfNeeded(); | ||
} | ||
request(1); | ||
return; | ||
} else { | ||
try { | ||
getOrCreateScalarValueQueue().onNext(value); | ||
} catch (MissingBackpressureException e) { | ||
onError(e); | ||
try { | ||
synchronized (this) { | ||
if (emitLock) { | ||
missedEmitting = true; | ||
getOrCreateScalarValueQueue().onNext(value); | ||
return; | ||
} | ||
missedEmitting = false; | ||
emitLock = true; | ||
} | ||
} catch (MissingBackpressureException e) { | ||
onError(e); | ||
return; | ||
} | ||
boolean moreToDrain; | ||
try { | ||
actual.onNext(value); | ||
} finally { | ||
moreToDrain = releaseEmitLock(); | ||
} | ||
if (moreToDrain) { | ||
drainQueuesIfNeeded(); | ||
} | ||
request(1); | ||
} | ||
|
||
private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) { | ||
|
||
// if we didn't return above we need to enqueue | ||
// enqueue the values for later delivery | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These comments are confusing here as what "return above" is it referring to? It's the start of a method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, those comments were copy/pasted from the end of the method ... yes, definitely unclear. |
||
try { | ||
getOrCreateScalarValueQueue().onNext(t.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This now means it will always pass through a queue which diminishes some of the optimization. This class tries hard to avoid putting values in a queue if it can otherwise emit directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what caused it, but if I didn't do this enqueueing, despite the other changes the operator often didn't deliver all scalar values; not to mention, scalar values could get reordered for some reason. |
||
} catch (MissingBackpressureException e) { | ||
onError(e); | ||
return; | ||
} | ||
|
||
if (getEmitLock()) { | ||
boolean emitted = false; | ||
int emitted = 0; | ||
boolean moreToDrain; | ||
boolean isReturn = false; | ||
try { | ||
long r = mergeProducer.requested; | ||
if (r > 0) { | ||
emitted = true; | ||
actual.onNext(t.get()); | ||
MergeProducer.REQUESTED.decrementAndGet(mergeProducer); | ||
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here | ||
isReturn = true; | ||
} | ||
emitted = drainScalarValueQueue(); | ||
} finally { | ||
moreToDrain = releaseEmitLock(); | ||
} | ||
if (moreToDrain) { | ||
drainQueuesIfNeeded(); | ||
} | ||
if (emitted) { | ||
request(1); | ||
} | ||
if (isReturn) { | ||
return; | ||
if (emitted > 0) { | ||
request(emitted); | ||
} | ||
} | ||
|
||
// if we didn't return above we need to enqueue | ||
// enqueue the values for later delivery | ||
try { | ||
getOrCreateScalarValueQueue().onNext(t.get()); | ||
} catch (MissingBackpressureException e) { | ||
onError(e); | ||
} | ||
} | ||
|
||
private RxRingBuffer getOrCreateScalarValueQueue() { | ||
|
@@ -316,20 +311,16 @@ private RxRingBuffer getOrCreateScalarValueQueue() { | |
|
||
private synchronized boolean releaseEmitLock() { | ||
emitLock = false; | ||
if (missedEmitting == 0) { | ||
return false; | ||
} else { | ||
return true; | ||
} | ||
return missedEmitting; | ||
} | ||
|
||
private synchronized boolean getEmitLock() { | ||
if (emitLock) { | ||
missedEmitting++; | ||
missedEmitting = true; | ||
return false; | ||
} else { | ||
emitLock = true; | ||
missedEmitting = 0; | ||
missedEmitting = false; | ||
return true; | ||
} | ||
} | ||
|
@@ -446,7 +437,8 @@ private void innerError(Throwable e, boolean parent) { | |
if (!parent) { | ||
wip--; | ||
} | ||
if ((wip == 0 && completed) || (wip < 0)) { | ||
RxRingBuffer svq = scalarValueQueue; | ||
if ((wip == 0 && completed && (svq == null || svq.isEmpty())) || (wip < 0)) { | ||
sendOnComplete = true; | ||
} | ||
} | ||
|
@@ -463,7 +455,8 @@ public void onCompleted() { | |
boolean c = false; | ||
synchronized (this) { | ||
completed = true; | ||
if (wip == 0) { | ||
RxRingBuffer svq = scalarValueQueue; | ||
if (wip == 0 && (svq == null || svq.isEmpty())) { | ||
c = true; | ||
} | ||
} | ||
|
@@ -477,7 +470,8 @@ void completeInner(InnerSubscriber<T> s) { | |
boolean sendOnComplete = false; | ||
synchronized (this) { | ||
wip--; | ||
if (wip == 0 && completed) { | ||
RxRingBuffer svq = scalarValueQueue; | ||
if (wip == 0 && completed && (svq == null || svq.isEmpty())) { | ||
sendOnComplete = true; | ||
} | ||
} | ||
|
@@ -491,12 +485,12 @@ private void drainAndComplete() { | |
boolean moreToDrain = true; | ||
while (moreToDrain) { | ||
synchronized (this) { | ||
missedEmitting = 0; | ||
missedEmitting = false; | ||
} | ||
drainScalarValueQueue(); | ||
drainChildrenQueues(); | ||
synchronized (this) { | ||
moreToDrain = missedEmitting > 0; | ||
moreToDrain = missedEmitting; | ||
} | ||
} | ||
RxRingBuffer svq = scalarValueQueue; | ||
|
@@ -549,7 +543,8 @@ public void request(long n) { | |
if (ms.drainQueuesIfNeeded()) { | ||
boolean sendComplete = false; | ||
synchronized (ms) { | ||
if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) { | ||
RxRingBuffer svq = ms.scalarValueQueue; | ||
if (ms.wip == 0 && ms.completed && (svq == null || svq.isEmpty())) { | ||
sendComplete = true; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a future rewrite we can optimize this because it could drain queues and request(n) and then immediately request(1) again.