Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quadratic slowdown in latency of tokio::sync::broadcast::Sender::send as the number of Receivers increases, when each receiver is running in its own single-threaded runtime #5923

Closed
glittershark opened this issue Aug 9, 2023 · 4 comments · Fixed by #5925
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate M-sync Module: tokio/sync

Comments

@glittershark
Copy link
Contributor

glittershark commented Aug 9, 2023

Version

❯ cargo tree | grep tokio
└── tokio v1.30.0
    └── tokio-macros v2.1.0 (proc-macro)

Platform

❯ uname -a
Linux ogopogo 6.1.41 #1-NixOS SMP PREEMPT_DYNAMIC Mon Jul 24 16:55:35 UTC 2023 x86_64 GNU/Linux

Description

In an application that launches many threads (up to hundreds) each of which runs a select loop which polls on tokio::sync::broadcast::Receiver::recv(), calls to the send half of that channel sporadically get quadratically slower, up to minutes, as the number of receivers starts to increase past around 40. This behavior is worse (by about half an order of magnitude) if running in release mode. To demonstrate, I've written the following repro, which outputs a CSV :

use std::thread;
use std::time::Instant;
use std::{sync::Arc, time::Duration};

use tokio::select;
use tokio::sync::broadcast;
use tokio::time::sleep;

const BUFFER_SIZE: usize = 64;

async fn recv_loop(tx: Arc<broadcast::Sender<usize>>) {
    let mut rx = tx.subscribe();
    loop {
        select! {
            _ = sleep(Duration::from_millis(100)) => {}
            res = rx.recv() => {
                if matches!(res, Err(broadcast::error::RecvError::Closed)) {
                    break;
                }
            }
        }
    }
}

fn run_test(num_subscribers: usize) -> Duration {
    let (tx, _) = broadcast::channel(BUFFER_SIZE);
    let tx = Arc::new(tx);

    for _ in 0..num_subscribers {
        let tx = tx.clone();

        thread::spawn(move || {
            let runtime = tokio::runtime::Builder::new_current_thread()
                .enable_all()
                .max_blocking_threads(1)
                .build()
                .unwrap();

            runtime.block_on(recv_loop(tx));
        });
    }

    let start = Instant::now();
    let _ = tx.send(0);
    start.elapsed()
}

fn main() {
    for num_subscribers in 30..120 {
        let dur = run_test(num_subscribers);
        println!("{num_subscribers},{}", dur.as_nanos());
    }
}

And plotted the results in both debug and release mode when run on my machine:

plot

@glittershark glittershark added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Aug 9, 2023
@glittershark glittershark changed the title Quadratic slowdown in latency of tokio::sync::broadcast::sender::Send as the number of Receivers increases, when each receiver is running in its own single-threaded runtime Quadratic slowdown in latency of tokio::sync::broadcast::Sender::send as the number of Receivers increases, when each receiver is running in its own single-threaded runtime Aug 9, 2023
readysetbot pushed a commit to readysettech/readyset that referenced this issue Aug 9, 2023
Within a running Replica, we maintain a map of established connections
for other domain replicas, to avoid having to re-establish a connection
on every message. If one of those replicas *died*, however, we'd never
remove the established connection from that map, meaning we'd never be
able to send messages to it if it ever came back.

Now, the `ChannelCoordinator` has a `tokio::sync::broadcast` channel
which can notify interested subscribers when the socket address for a
ReplicaAddress is changed - we subscribe to this in the Replica's
select-loop to remove the entry from that map of connections when we're
removing the configured socket from the channel coordinator.

This fixes an issue where after a worker died and came back, it'd never
be able to receive domain messages (such as full replay packets!).

This reverts commit 922f5028f2f75cc969805ec73033ceaf0f54320d.

Note that there is a *significant* performance issue here for large
deployments - there empirically seems to be a roughly quadratic slowdown
in `tokio::sync::broadcast::Sender::send` as the number of active
receivers increases - I've seen `send` take *minutes* when there are
more than like 100 receivers. We're band-aid-ing this issue (which is
why 922f5028f (Revert "server: Remove conns for replicas that die",
2023-08-03) happened in the first place) by avoiding sending anything on
the channel unless we *definitely* have to - but we ought to fix this at
some point in the future (it might be a tokio bug - I've reported it as
tokio-rs/tokio#5923 and we'll see how that
goes).

Change-Id: Ic56e30a05e3b1e7552defca452c2fd5526692a55
readysetbot pushed a commit to readysettech/readyset that referenced this issue Aug 9, 2023
Within a running Replica, we maintain a map of established connections
for other domain replicas, to avoid having to re-establish a connection
on every message. If one of those replicas *died*, however, we'd never
remove the established connection from that map, meaning we'd never be
able to send messages to it if it ever came back.

Now, the `ChannelCoordinator` has a `tokio::sync::broadcast` channel
which can notify interested subscribers when the socket address for a
ReplicaAddress is changed - we subscribe to this in the Replica's
select-loop to remove the entry from that map of connections when we're
removing the configured socket from the channel coordinator.

This fixes an issue where after a worker died and came back, it'd never
be able to receive domain messages (such as full replay packets!).

This reverts commit 922f5028f2f75cc969805ec73033ceaf0f54320d.

Note that there is a *significant* performance issue here for large
deployments - there empirically seems to be a roughly quadratic slowdown
in `tokio::sync::broadcast::Sender::send` as the number of active
receivers increases - I've seen `send` take *minutes* when there are
more than like 100 receivers. We're band-aid-ing this issue (which is
why 922f5028f (Revert "server: Remove conns for replicas that die",
2023-08-03) happened in the first place) by avoiding sending anything on
the channel unless we *definitely* have to - but we ought to fix this at
some point in the future (it might be a tokio bug - I've reported it as
tokio-rs/tokio#5923 and we'll see how that
goes).

Change-Id: Ic56e30a05e3b1e7552defca452c2fd5526692a55
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5680
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <[email protected]>
@glittershark
Copy link
Contributor Author

some extra info: it looks like whatever send is doing that's taking so long, it's doing it after the receivers all receive the message - somewhere in this loop

@Darksonn
Copy link
Contributor

It's probably that receivers keep being re-added during that loop. We can most likely fix it by limiting the number of wakers woken to whatever the total number was before it started iterating.

Thanks for reporting this.

@Darksonn Darksonn added M-sync Module: tokio/sync E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate labels Aug 10, 2023
@glittershark
Copy link
Contributor Author

@Darksonn something like this? This does fix the perf issue, which is great!

diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs
index 246ec212..ce503996 100644
--- a/tokio/src/sync/broadcast.rs
+++ b/tokio/src/sync/broadcast.rs
@@ -820,6 +820,8 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
 impl<T> Shared<T> {
     fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
         let mut wakers = WakeList::new();
+        let active_receivers = tail.rx_cnt;
+        let mut woken = 0usize;
         'outer: loop {
             while wakers.can_push() {
                 match tail.waiters.pop_back() {
@@ -832,6 +834,7 @@ impl<T> Shared<T> {
 
                         if let Some(waker) = waiter.waker.take() {
                             wakers.push(waker);
+                            woken += 1;
                         }
                     }
                     None => {
@@ -852,6 +855,10 @@ impl<T> Shared<T> {
 
             // Acquire the lock again.
             tail = self.tail.lock();
+
+            if woken >= active_receivers {
+                break;
+            }
         }
 
         // Release the lock before waking.

@glittershark
Copy link
Contributor Author

I was worried that wouldn't fix the issue if there was a receiver that wasn't trying to recv(), but it looks like the perf issue is fixed even with this patch to my repro above:

diff --git a/src/main.rs b/src/main.rs
index d35832b..fbe9e4b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,7 +23,7 @@ async fn recv_loop(tx: Arc<broadcast::Sender<usize>>) {
 }
 
 fn run_test(num_subscribers: usize) -> Duration {
-    let (tx, _) = broadcast::channel(BUFFER_SIZE);
+    let (tx, rx_that_never_recvs) = broadcast::channel(BUFFER_SIZE);
     let tx = Arc::new(tx);
 
     for _ in 0..num_subscribers {
@@ -40,6 +40,8 @@ fn run_test(num_subscribers: usize) -> Duration {
         });
     }
 
+    drop(rx_that_never_recvs);
+
     let start = Instant::now();
     let _ = tx.send(0);
     start.elapsed()

Darksonn pushed a commit that referenced this issue Aug 14, 2023
Within `notify_rx`, looping while re-locking and re-reading from
`Shared.tail` as long as there are still available wakers causes a
quadratic slowdown as receivers which are looping receiving from the
channel are added. Instead of continually re-reading from the original
list, this commit modifies `notify_rx` to move the waiters into a
separate list immediately similar to how `Notify::notify_waiters` works,
using a new `WaitersList` struct modified after NotifyWaitersList.

Fixes #5923
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. E-help-wanted Call for participation: Help is requested to fix this issue. E-medium Call for participation: Experience needed to fix: Medium / intermediate M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants