-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.x: overhead reduction for merge and flatMap #3476
Conversation
Rebasing... |
7c7bb7a
to
a8ce5e3
Compare
... rebased, sort of. |
@@ -488,7 +502,10 @@ protected void emitScalar(T value, long r) { | |||
if (r != Long.MAX_VALUE) { | |||
producer.produced(1); | |||
} | |||
this.requestMore(1); | |||
if (++scalarEmission == scalarLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
👍/2, I was able to understand what you did for @akarnokd can you please somehow improve readability of Performance improvement looks awesome, thank you for doing that! |
For me, it is the opposite; reading long variable names and methods drains my focus. You can always post a cleanup PR. |
@akarnokd there might be some happy medium here. Write your code using one On 11 November 2015 at 20:00, David Karnok [email protected] wrote:
|
a8ce5e3
to
5865f4a
Compare
Fine, I've renamed the variables. Now it doesn't fit on my screen and I can't tell what is what because all variable names look the same at glance. Distinguishing between |
5865f4a
to
3b93232
Compare
Rebased onto main. |
scalarEmissionLimit = Integer.MAX_VALUE; | ||
request(Long.MAX_VALUE); | ||
} else { | ||
scalarEmissionLimit = Math.max(1, maxConcurrent >> 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small question: any reason for setting scalarEmissionLimit
to maxConcurrent / 2
? Or just pick up a magic number 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to use 50% - 75% of the prefetch as the rerequest amount. It's a heuristic and its optimal value depends on the source emission pattern. The idea is to amortize a 40 cycle atomic increment by doing it less frequently. Basically any value above 1 helps with the overhead reduction but too large and any source to consumer loses pipelining effects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying 👍
👍 |
int produced = scalarEmissionCount + 1; | ||
if (produced == scalarEmissionLimit) { | ||
scalarEmissionCount = 0; | ||
this.requestMore(produced); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use the const scalarEmissionLimit
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That equals the producer which is already a local/register loaded value
1.x: overhead reduction for merge and flatMap
Changes to the scalar fast-path was inspired by the Project Reactor's
flatMap
which was in turn inspired by RxJava 2.x's implementation offlatMap
.Naturally, this will conflict with #3169 .
Benchmark for comparison (i7 4770K, Windows 7 x64, Java 8u66):
Just by applying the scalar re-batching, the operator gained a massive 45% throughput increase, from 48 MOps/s to 71 MOps/s.
When the range optimization is also applied, the improvement is even more impressive: +60% throughput, from 48 MOps/s to 79 MOps/s.
The optimization doesn't really affect
rangeFlatMapRange
, it has a larger run-to-run variance due to GC.I'm experimenting with the 2.x branch as well and by applying these two optimizations, the throughput increasd from 40 MOps/s to 58 MOps/s. I'm investigating if switching to synchronized would help with the remaining overhead gap.
Note also that the perf tests measure the operator overhead only.