Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

window_service: use the service thread as a rayon worker thread #3876

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

alessandrod
Copy link

@alessandrod alessandrod commented Dec 3, 2024

This reduces latency in processing a small number of shreds, since they'll be processed directly on the current thread avoiding the rayon fork/join overhead.

On current mnb traffic this improves replay-loop-timing.wait_receive_elapsed_us ~2.5x.

Screenshot 2024-12-03 at 11 47 19 am Screenshot 2024-12-03 at 11 47 38 am Screenshot 2024-12-03 at 1 50 27 pm

@alessandrod
Copy link
Author

This is what becomes faster

let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {

@alessandrod alessandrod requested a review from steviez December 3, 2024 03:24
Copy link

@behzadnouri behzadnouri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already do something similar elsewhere in the code:
https://github.com/anza-xyz/agave/blob/397e92849/ledger/src/shred/merkle.rs#L1199-L1233
https://github.com/anza-xyz/agave/blob/397e92849/turbine/src/retransmit_stage.rs#L255-L305
i.e. forgo the thread-pool entirely if the batch is small.
To my understanding, comparing to use_current_thread the difference is that:

  • use_current_thread effectively adds one more worker thread to the thread-pool.
  • use_current_thread still incurs the thread-pool overhead for small batches whereas above linked code bypasses it entirely.

So, one question is why or if use_current_thread is better than forgoing the thread-pool entirely for small batches.

That said, my previous observations have been that this approach generally tends to be not very effective in practice, intuitively because under load we generally see larger batches, and larger batches aren't processed any faster (if the thread-pool size is the same but use_current_thread is adding one more worker thread here) and small batches are already processed pretty fast (relative to large batches) so processing them faster statistically does not move the needle much unless majority of batches are pretty small and the thread-pool is just adding overhead most of the time.

But 2.5x improvement you are showing obviously does not line up with that intuition, so I am wondering if something else is going on here. like:

  • majority of the batches are pretty small and we are just adding overhead with thread-pool here?
  • the extra worker thread has disproportionate impact here?
  • there is a performance bug with rayon implementation which use_current_thread accidentally bypasses it?

@alessandrod
Copy link
Author

We already do something similar elsewhere in the code: https://github.com/anza-xyz/agave/blob/397e92849/ledger/src/shred/merkle.rs#L1199-L1233 https://github.com/anza-xyz/agave/blob/397e92849/turbine/src/retransmit_stage.rs#L255-L305 i.e. forgo the thread-pool entirely if the batch is small. To my understanding, comparing to use_current_thread the difference is that:

* `use_current_thread` effectively adds one more worker thread to the thread-pool.

* `use_current_thread` still incurs the thread-pool overhead for small batches whereas above linked code bypasses it entirely.

So, one question is why or if use_current_thread is better than forgoing the thread-pool entirely for small batches.

No it doesn't. By making the calling thread a worker thread, when you call pool.install(f) it becomes equivalent to calling f(). Then, because the parallel iterator is started on the current thread, this is what happens:

  1. Two output vectors with 0 capacity are created to store the results of unzip
  2. The input vector is logically split in 2 at the mid point - importantly this makes no copies, only the indices are computed
  3. Two jobs are created, job_a to process [0..mid), job_b to process [mid,len)
  4. both jobs are queued on the current worker job queue (the calling thread with use_current_thread())
  5. the current worker wakes up one other worker, which races to steal jobs from the current worker's queue
  6. the current worker starts executing its job queue. Executing repeats from step 3.

When there are only a handful of shreds, there is virtually no splitting (but it can further be reduced with .with_min_len()), and there is no job stealing because the current thread worker is able to execute its whole job queue faster than the other worker can wake up and check for jobs. The only overhead is to wake up one extra worker from the pool - this can be avoided with .with_min_len()).

Memory wise, the only allocation that happens is the two output vecs, which is the same as if rayon wasn't being used.

That said, my previous observations have been that this approach generally tends to be not very effective in practice, intuitively because under load we generally see larger batches, and larger batches aren't processed any faster (if the thread-pool size is the same but use_current_thread is adding one more worker thread here) and small batches are already processed pretty fast (relative to large batches) so processing them faster statistically does not move the needle much unless majority of batches are pretty small and the thread-pool is just adding overhead most of the time.

But 2.5x improvement you are showing obviously does not line up with that intuition, so I am wondering if something else is going on here. like:

* majority of the batches are pretty small and we are just adding overhead with thread-pool here?

Majority of batches with current mnb traffic are small. By keeping thread-pool we ensure we have capacity for spikes, which is the thing you objected to steve's other PR to reduce the number of receive threads.

* the extra worker thread has disproportionate impact here?

Yes and no. The disproportionate impact is not making the current thread dispatch work to another thread, wake the other thread up, sleep (the current thread) on a condvar, have the other thread notify the condvar when the work is finished to wake up the current thread.

* there is a performance bug with rayon implementation which `use_current_thread` accidentally bypasses it?

no

This reduces latency in processing a small number of shreds, since
they'll be processed directly on the current thread avoiding the rayon
fork/join overhead.

On current mnb traffic this improves
replay-loop-timing.wait_receive_elapsed_us ~2.5x.
@alessandrod alessandrod force-pushed the window-service-current-thread branch from 830280f to 4f2c74a Compare December 4, 2024 04:08
@behzadnouri
Copy link

Do you want to compare this code with the below patch? If you do not want to its ok. I can do it myself.

If you do, please note that:

  • use_current_thread adds an extra worker thread to thread-pool, so an apple to apple comparison should adjust the thread-pool size accordingly.
  • Instead of using replay metrics, handle_packets_elapsed_us metric in recv-window-insert-shreds does a more isolated measurement of the change.
  • I used if num_packets < 8 { with minimal testing and it was not on mainnet. Some other threshold might be optimal.
diff --git a/core/src/window_service.rs b/core/src/window_service.rs
index 0d2e0b7531..5a0a501abd 100644
--- a/core/src/window_service.rs
+++ b/core/src/window_service.rs
@@ -309,15 +309,24 @@ where
             Some((shred, None))
         }
     };
+    let num_packets = packets.iter().map(PacketBatch::len).sum::<usize>();
     let now = Instant::now();
-    let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
+    let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = if num_packets < 8 {
         packets
-            .par_iter()
-            .flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
+            .iter()
+            .flat_map(|packets| packets.iter().filter_map(handle_packet))
             .unzip()
-    });
+    } else {
+        thread_pool.install(|| {
+            packets
+                .par_iter()
+                .flat_map(PacketBatch::par_iter)
+                .filter_map(handle_packet)
+                .unzip()
+        })
+    };
     ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
-    ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
+    ws_metrics.num_packets += num_packets;
     ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
     ws_metrics.num_shreds_received += shreds.len();
     for packet in packets.iter().flat_map(PacketBatch::iter) {

@alessandrod
Copy link
Author

alessandrod commented Dec 4, 2024

Do you want to compare this code with the below patch? If you do not want to its ok. I can do it myself.

Have you read my comment above? Do you understand why use_current_thread improves things?

If you do, please note that:

* `use_current_thread` adds an extra worker thread to thread-pool, so an apple to apple comparison should adjust the thread-pool size accordingly.

This makes me think that you don't understand what the patch does. What do you think happens with your suggested patch at num_packets=9? Or num_packets=X+1 where X is whatever magic number happens to work with mnb traffic today, but stops being accurate tomorrow, and so perf falls off a cliff again?

Do I really have to spend a day to show you the overhead of futex?

@alessandrod
Copy link
Author

  • let num_packets = packets.iter().map(PacketBatch::len).sum::();

Without even considering that this requires an extra loop on all the packets on all the batches, which is something that is done with every single collection in this and other threads and drives me crazy. If we're trying to write perf efficient code it'd be great if we didn't loop 400 times on the same thing, do you agree?

@alessandrod
Copy link
Author

this is handle_packets btw, 5x+

Screenshot 2024-12-05 at 5 31 48 pm Screenshot 2024-12-05 at 5 31 31 pm

@behzadnouri
Copy link

Okay, I misunderstood rayon's documentation. It was also not clear to what part of my comment you were replying "No it doesn't."
There is no reason to become angry or disrespectful when I am even offering to do the testing myself.

Below right is handle_packets_elpased_us with your change and left is the patch I posted above with if num_packets < 32 {. So ~2x faster.
I think you would at least need this part of the patch I posted above, but I have not yet tested if that is the reason for 2x performance difference.

--- a/core/src/window_service.rs
+++ b/core/src/window_service.rs
@@ -313,7 +313,8 @@ where
     let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
         packets
             .par_iter()
-            .flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
+            .flat_map(PacketBatch::par_iter)
+            .filter_map(handle_packet)
             .unzip()
     });
     ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;

Besides that:

  1. What are the downsides with use_current_thread? why rayon doesn't do it by default?
  2. The documentation says something about leaking registry. Isn't that an issue here? Does the leaking happen each time we call thread_pool.install or only once?
  3. Should we go ahead and add use_current_thread to all our thread-pools?

241205 handle_packets_elpased_us

@alessandrod
Copy link
Author

I think you would at least need this part of the patch I posted above

Good point this actually makes sense

@alessandrod
Copy link
Author

There is no reason to become angry or disrespectful when I am even offering to do the testing myself.

I sent a patch that shows a 2.5x improvement in replay latency (which, btw, is the important metric not some micro metric). You didn't understand why the patch works and asked some questions. I spent time answering your questions. You did not acknowledge my reply at all and instead just replied "try this patch".

This is annoying because 1) it clearly shows that you didn't understand the problem and weren't interested in my explanation, and 2) sent an absolutely trivial patch like if I'm dumb and haven't thought of doing if MAGIC_NUMBER { dont_do_rayon }. I'm sending this patch because I spent time profiling this code and understanding what's going on. I have literally reviewed all our pool.install() usages before sending the patch. Be charitable and assume that you're not the only one being thorough.

I've told you this before this "I know better" attitude comes up all the time and definitely irritates me.

@behzadnouri
Copy link

I appreciate you being candid and straightforward but as I have mentioned to you previously there is nothing personal and even if you do not agree with my opinions or review style there is no reason to get angry.
Here I asked you an honest question and I was genuinely interested to know how does use_current_thread compare with the alternative that we have been doing elsewhere in the code. I even offered to do the testing myself (which I did). I do not know what else I can do so that you do not assume ill intent to code review comments or questions.

Below is mean handle_packets_elapsed_us over 30s intervals:

  • The left chunk is this PR.
  • The middle chunk is this PR + the .flat_map(PacketBatch::par_iter).filter_map(handle_packet) patch I mentioned above which surprisingly became worse.
  • The right chunk is the patch I posted earlier with if num_packets < 32 {.

241206 mean_handle_packets_elapsed_us

Copy link

@steviez steviez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late to the game here - I needed to do some reading of docs & source myself to be able to add anything. I had some questions similar to Behzad that I ended up answering while digging around.

What are the downsides with use_current_thread? why rayon doesn't do it by default?

Nothing comes to mind for downsides, and having the calling thread (which is blocked waiting on the pool calculation to complete) chip in intuitively makes sense. As for why, I found this explanation in rayon issue 1052:

one caution that I want to put in the docs is that the pool-building thread can only involve itself in use_current for a single pool, and I'll need to put in an error for that case. So that's akin to a global resource, and I would generally not recommend a library crate to create pools this way

But given that we have context about our case, seemingly fine

The documentation says something about leaking registry. Isn't that an issue here? Does the leaking happen each time we call thread_pool.install or only once?

I believe this is a one time thing at threadpool creation. Most of our pools are spun up at startup and kept for lifetime of process so I think we can live with this. I believe this comment is referring to the same item from the docs:
https://github.com/rayon-rs/rayon/blob/d179ea9fa69cab08a3b2762ff9b15249a71b9b94/rayon-core/src/registry.rs#L291-L293

Should we go ahead and add use_current_thread to all our thread-pools?

I think we could add it to many of our threadpools, but I guess the two instances we need to avoid our:

  • One calling thread has access to two pools
  • Pools that are repeatedly created and destroyed (such as the pool created in Bank::new_from_parent() when we cross epoch boundaries)

@alessandrod
Copy link
Author

Should we go ahead and add use_current_thread to all our thread-pools?

I think we could add it to many of our threadpools, but I guess the two instances we need to avoid our:

* One calling thread has access to two pools

* Pools that are repeatedly created and destroyed (such as the pool created in `Bank::new_from_parent()` when we cross epoch boundaries)

We shouldn't when the cost of waiting for the workers to finish all the work is less than the cost that it takes to execute "some part of the work" in the current thread.

For example, in sigverify and entry verification, it's empirically faster to not do any work from the current thread (I've tried this last week).

@alessandrod
Copy link
Author

I appreciate you being candid and straightforward but as I have mentioned to you previously there is nothing personal and even if you do not agree with my opinions or review style there is no reason to get angry. Here I asked you an honest question and I was genuinely interested to know how does use_current_thread compare with the alternative that we have been doing elsewhere in the code. I even offered to do the testing myself (which I did). I do not know what else I can do so that you do not assume ill intent to code review comments or questions.

Below is mean handle_packets_elapsed_us over 30s intervals:

* The left chunk is this PR.

* The middle chunk is this PR + the `.flat_map(PacketBatch::par_iter).filter_map(handle_packet)` patch I [mentioned above](https://github.com/anza-xyz/agave/pull/3876#issuecomment-2520898038) which surprisingly became worse.

* The right chunk is [the patch I posted earlier](https://github.com/anza-xyz/agave/pull/3876#issuecomment-2518066334) with `if num_packets < 32 {`.

Again, what happens at num_packets = 33? And what happens when we merge the other PR that reduces the number of receive sockets, and so the batches get larger?

@steviez
Copy link

steviez commented Dec 9, 2024

The right chunk is #3876 (comment) with if num_packets < 32 {

With current MNB load, I think you are bypassing the threadpool just about every time with this constant. Doing num_packets / run_insert_count shows a mean of like 2 packets per iteration over the past week.

The middle chunk is this PR + the .flat_map(PacketBatch::par_iter).filter_map(handle_packet) patch I #3876 (comment) which surprisingly became worse.

I think this can be explained from this section of the docs:
https://docs.rs/rayon/latest/rayon/iter/trait.ParallelIterator.html#flat_map_iter-versus-flat_map

With flat_map, each of the nested iterators must be a parallel iterator, and they will be further split up with nested parallelism.
With flat_map_iter, each nested iterator is a sequential Iterator, and we only parallelize between them, while the items produced by each nested iterator are processed sequentially.
...
If there’s little computation involved, or its length is much less than the outer parallel iterator, then it may perform better to avoid the overhead of parallelism, just flattening sequentially with flat_map_iter

I believe the extra parallelism might be biting us. More so, I think it makes sense to keep the processing of a single PacketBatch sequential for the sake of locality. We know upstream will be limiting batch sizes if there are enough packets coming through for it matter (64 per batch I think) so it isn't like we'll have one thread processing 1k packets or something.

If we did want to use flat_map(PacketBatch::par_iter).filter_map(handle_packet), then the with_min_len() that Alessandro mentioned above could prevent the situation of too many tiny jobs getting created

@behzadnouri
Copy link

Again, what happens at num_packets = 33?

I am not sure what you are implying.
There are 2 code paths: one is generally slower but has less overhead, the other is faster but has more overhead. If num_packets = 32 (or whatever other threshold) is actually a good switchover point the two have "roughly" the same performance around that point, and we switch from one to the other. Something like below:

num-packets

I fully acknowledge that this is pretty simplified (in particular there are num-packet-batches in addition to num-packets) but that is the general idea.

And what happens when we merge the other PR that reduces the number of receive sockets, and so the batches get larger?

I do not know. That is something which needs to be tested and adjust accordingly. We already had some observations here which was surprising and not matching expectations, so I would rather not guess.

The point is this PR is a significant improvement over current master code but nonetheless leaves another 2x performance improvement on the table. If you merge this PR as is and then someone makes a follow up PR which shows another 2x perf improvement do you want to dismiss that additional 2x perf improvement?

@steviez
Copy link

steviez commented Dec 10, 2024

Should we go ahead and add use_current_thread to all our thread-pools?

We shouldn't when the cost of waiting for the workers to finish all the work is less than the cost that it takes to execute "some part of the work" in the current thread.

For example, in sigverify and entry verification, it's empirically faster to not do any work from the current thread (I've tried this last week).

I'm not sure I completely follow the reasoning here. If the work-to-parallelize is costly (relative to the coordination-work), is the idea here that we don't want the current thread to get bogged down doing work-to-parallelize which will will delay it from completing the coordination-work ?

Assuming I got that right, does heavier packet load change things for in this case ? I know the work-to-parallelize isn't very costly, but I'm wondering if an order of magnitude increase in number of packets coming through this service changes that

@alessandrod
Copy link
Author

Should we go ahead and add use_current_thread to all our thread-pools?

We shouldn't when the cost of waiting for the workers to finish all the work is less than the cost that it takes to execute "some part of the work" in the current thread.
For example, in sigverify and entry verification, it's empirically faster to not do any work from the current thread (I've tried this last week).

I'm not sure I completely follow the reasoning here. If the work-to-parallelize is costly (relative to the coordination-work), is the idea here that we don't want the current thread to get bogged down doing work-to-parallelize which will will delay it from completing the coordination-work ?

If use_current_thread is disabled, when calling pool.install(work) the current thread needs to A) queue work in the pool (potentially syscall), B) wake up some of the workers (syscall), C) put itself to sleep and wait to be signaled when all the work is complete (syscall).

If use_current_thread is enabled, when calling pool.install(work) the current thread X) queues the work in a local, lock free queue (never syscall), Y) wakes up another worker (syscall) Z) start executing the work.

If the pool can execute the work quickly (for example entry verification, which has 32+ threads available in the pool), and the time for the current thread to do one "unit" of work (signature verification) makes it so that X + Y + Z takes longer than A + B + C, it's better to not execute that unit of work but to syscall instead - it has slightly more system overhead but it completes faster.

@steviez
Copy link

steviez commented Dec 10, 2024

I restarted my node with this PR as I was interested in a blockstore metric. Looking at the two metrics that were mentioned above, I see the improvement in handle_packets_elapsed_us but not in wait_recieve_elapsed_us. In fact, I actually see a slight increase in wait_receive:

image

The node has been in "steady-state" for ~8 hours which I think is long enough to where I should have observed the improvement. I applied this commit on top of 6c6c26e, which is the tip of master from ~12 hours ago. I'm going to restart with the exact state of this branch to see if that yields me the waite_receive gains

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants