diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index d6545e8047f..7b07d9373c8 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -73,6 +73,33 @@ fn contention_bounded(g: &mut BenchmarkGroup) { }); } +fn contention_bounded_recv_many(g: &mut BenchmarkGroup) { + let rt = rt(); + + g.bench_function("bounded_recv_many", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + let mut buffer = Vec::::with_capacity(5_000); + let mut total = 0; + while total < 1_000 * 5 { + total += rx.recv_many(&mut buffer).await; + } + }) + }) + }); +} + fn contention_bounded_full(g: &mut BenchmarkGroup) { let rt = rt(); @@ -98,6 +125,33 @@ fn contention_bounded_full(g: &mut BenchmarkGroup) { }); } +fn contention_bounded_full_recv_many(g: &mut BenchmarkGroup) { + let rt = rt(); + + g.bench_function("bounded_full_recv_many", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(100); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).await.unwrap(); + } + }); + } + + let mut buffer = Vec::::with_capacity(5_000); + let mut total = 0; + while total < 1_000 * 5 { + total += rx.recv_many(&mut buffer).await; + } + }) + }) + }); +} + fn contention_unbounded(g: &mut BenchmarkGroup) { let rt = rt(); @@ -123,6 +177,33 @@ fn contention_unbounded(g: &mut BenchmarkGroup) { }); } +fn contention_unbounded_recv_many(g: &mut BenchmarkGroup) { + let rt = rt(); + + g.bench_function("unbounded_recv_many", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + for _ in 0..5 { + let tx = tx.clone(); + tokio::spawn(async move { + for i in 0..1000 { + tx.send(i).unwrap(); + } + }); + } + + let mut buffer = Vec::::with_capacity(5_000); + let mut total = 0; + while total < 1_000 * 5 { + total += rx.recv_many(&mut buffer).await; + } + }) + }) + }); +} + fn uncontented_bounded(g: &mut BenchmarkGroup) { let rt = rt(); @@ -143,6 +224,28 @@ fn uncontented_bounded(g: &mut BenchmarkGroup) { }); } +fn uncontented_bounded_recv_many(g: &mut BenchmarkGroup) { + let rt = rt(); + + g.bench_function("bounded_recv_many", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::channel::(1_000_000); + + for i in 0..5000 { + tx.send(i).await.unwrap(); + } + + let mut buffer = Vec::::with_capacity(5_000); + let mut total = 0; + while total < 1_000 * 5 { + total += rx.recv_many(&mut buffer).await; + } + }) + }) + }); +} + fn uncontented_unbounded(g: &mut BenchmarkGroup) { let rt = rt(); @@ -163,6 +266,28 @@ fn uncontented_unbounded(g: &mut BenchmarkGroup) { }); } +fn uncontented_unbounded_recv_many(g: &mut BenchmarkGroup) { + let rt = rt(); + + g.bench_function("unbounded_recv_many", |b| { + b.iter(|| { + rt.block_on(async move { + let (tx, mut rx) = mpsc::unbounded_channel::(); + + for i in 0..5000 { + tx.send(i).unwrap(); + } + + let mut buffer = Vec::::with_capacity(5_000); + let mut total = 0; + while total < 1_000 * 5 { + total += rx.recv_many(&mut buffer).await; + } + }) + }) + }); +} + fn bench_create_medium(c: &mut Criterion) { let mut group = c.benchmark_group("create_medium"); create_medium::<1>(&mut group); @@ -181,15 +306,20 @@ fn bench_send(c: &mut Criterion) { fn bench_contention(c: &mut Criterion) { let mut group = c.benchmark_group("contention"); contention_bounded(&mut group); + contention_bounded_recv_many(&mut group); contention_bounded_full(&mut group); + contention_bounded_full_recv_many(&mut group); contention_unbounded(&mut group); + contention_unbounded_recv_many(&mut group); group.finish(); } fn bench_uncontented(c: &mut Criterion) { let mut group = c.benchmark_group("uncontented"); uncontented_bounded(&mut group); + uncontented_bounded_recv_many(&mut group); uncontented_unbounded(&mut group); + uncontented_unbounded_recv_many(&mut group); group.finish(); } diff --git a/benches/sync_mpsc_recv_many.rs b/benches/sync_mpsc_recv_many.rs deleted file mode 100644 index 44b9e8b65fe..00000000000 --- a/benches/sync_mpsc_recv_many.rs +++ /dev/null @@ -1,379 +0,0 @@ -use bencher::Bencher; -use tokio::sync::mpsc; - -fn rt() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(6) - .build() - .unwrap() -} - -// Simulate a use case of an actor that must update -// a resource, but resource only needs last value -fn publish_last_value(last_value: usize) -> usize { - std::thread::sleep(std::time::Duration::from_nanos(1)); - last_value -} - -fn contention_bounded_updater_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - let mut last_value = 0usize; - for _ in 0..1_000 * 5 { - let Some(v) = rx.recv().await else { continue }; - last_value = v - } - last_value - }) - }); -} - -fn contention_bounded_updater_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - let mut last_value = 0usize; - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 5 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - last_value = buffer[buffer.len() - 1] - } - } - last_value - }) - }); -} - -fn contention_bounded_updater_publish_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..1 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - for _ in 0..1_000 { - let Some(v) = rx.recv().await else { continue }; - let _ = publish_last_value(v); - } - }) - }); -} - -fn contention_bounded_updater_publish_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for _ in 0..1 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 1 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - publish_last_value(buffer[buffer.len() - 1]); - } - } - }) - }); -} - -fn contention_bounded_full_updater_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(100); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - let mut last_value = 0usize; - for _ in 0..1_000 * 5 { - let Some(v) = rx.recv().await else { continue }; - last_value = v - } - last_value - }) - }); -} - -fn contention_bounded_full_updater_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(100); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).await.unwrap(); - } - }); - } - - let mut last_value = 0usize; - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 5 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - last_value = buffer[buffer.len() - 1] - } - } - last_value - }) - }); -} - -fn contention_unbounded_updater_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).unwrap(); - } - }); - } - - let mut last_value = 0usize; - for _ in 0..1_000 * 5 { - let Some(v) = rx.recv().await else { continue }; - last_value = v - } - last_value - }) - }); -} - -fn contention_unbounded_updater_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for _ in 0..5 { - let tx = tx.clone(); - tokio::spawn(async move { - for i in 0..1000 { - tx.send(i).unwrap(); - } - }); - } - - let mut last_value = 0usize; - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 5 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - last_value = buffer[buffer.len() - 1] - } - } - last_value - }) - }); -} - -fn uncontented_bounded_updater_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for i in 0..5000 { - tx.send(i).await.unwrap(); - } - - let mut last_value = 0usize; - for _ in 0..5_000 { - let Some(v) = rx.recv().await else { continue }; - last_value = v - } - last_value - }) - }); -} - -fn uncontented_bounded_updater_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::channel::(1_000_000); - - for i in 0..5000 { - tx.send(i).await.unwrap(); - } - - let mut last_value = 0usize; - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 5 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - last_value = buffer[buffer.len() - 1] - } - } - last_value - }) - }); -} - -fn uncontented_unbounded_updater_recv(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for i in 0..5000 { - tx.send(i).unwrap(); - } - - let mut last_value = 0usize; - for _ in 0..5_000 { - let Some(v) = rx.recv().await else { continue }; - last_value = v - } - last_value - }) - }); -} - -fn uncontented_unbounded_updater_recv_many(b: &mut Bencher) { - let rt = rt(); - - b.iter(|| { - rt.block_on(async move { - let (tx, mut rx) = mpsc::unbounded_channel::(); - - for i in 0..5000 { - tx.send(i).unwrap(); - } - - let mut last_value = 0usize; - let mut buffer = Vec::::with_capacity(5_000); - let mut total = 0; - while total < 1_000 * 5 { - let count = rx.recv_many(&mut buffer).await; - total += count; - if count > 0 { - last_value = buffer[buffer.len() - 1] - } - } - last_value - }) - }); -} - -bencher::benchmark_group!( - contention_bounded_updater, - contention_bounded_updater_recv, - contention_bounded_updater_recv_many -); - -bencher::benchmark_group!( - contention_bounded_updater_publish, - contention_bounded_updater_publish_recv, - contention_bounded_updater_publish_recv_many -); - -bencher::benchmark_group!( - contention_bounded_full_updater, - contention_bounded_full_updater_recv, - contention_bounded_full_updater_recv_many -); - -bencher::benchmark_group!( - contention_unbounded_updater, - contention_unbounded_updater_recv, - contention_unbounded_updater_recv_many -); - -bencher::benchmark_group!( - uncontented_bounded_updater, - uncontented_bounded_updater_recv, - uncontented_bounded_updater_recv_many -); - -bencher::benchmark_group!( - uncontented_unbounded_updater, - uncontented_unbounded_updater_recv, - uncontented_unbounded_updater_recv_many -); - -bencher::benchmark_main!( - contention_bounded_updater, - contention_bounded_updater_publish, - contention_bounded_full_updater, - contention_unbounded_updater, - uncontented_bounded_updater, - uncontented_unbounded_updater -);