Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: no more channels for UDP send/recv #1579

Merged
merged 36 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
35c1dff
bench: add iroh-net benchmark
Frando Oct 4, 2023
c759763
put peermap behind a mutex
Frando Oct 5, 2023
5eeab77
do not use channels for udp recv
Frando Oct 5, 2023
fab4eca
make things work properly
Frando Oct 5, 2023
2480dbf
fix
Frando Oct 5, 2023
f91e44b
one less PeerMap
Frando Oct 5, 2023
824469b
less peermap locking
Frando Oct 5, 2023
1cc984b
no more channels for send on quic
Frando Oct 5, 2023
d5e86d2
fix tests
Frando Oct 5, 2023
5cbfe03
cleanup
Frando Oct 5, 2023
d49c8f3
fix: better error handling and logging in poll_send
Frando Oct 6, 2023
aa4a575
refactor: use peermap directly for add_peer_addr
Frando Oct 7, 2023
73f5cc8
fix: actually ignore non-quic packets in quinn
Frando Oct 7, 2023
2bab081
cleanups and small fixes
dignifiedquire Oct 13, 2023
7509edd
Merge remote-tracking branch 'origin/main' into net-less-channels
dignifiedquire Oct 13, 2023
4c2650a
clippy
dignifiedquire Oct 13, 2023
33acfaf
fix: inform gossip about our own endpoint info
dignifiedquire Oct 13, 2023
a9a1ccd
Merge remote-tracking branch 'origin/main' into net-less-channels
Frando Oct 17, 2023
42f0699
fix: examples
Frando Oct 17, 2023
4dbf673
fix
Frando Oct 17, 2023
0fc730c
fix: doc links
Frando Oct 17, 2023
f897662
fix: restore truncate of derp transmits to udp transmits
Frando Oct 17, 2023
e594a26
fix: properly count quic packets
Frando Oct 17, 2023
e874cd6
refactor: use fixed buffer in poll_send to avoid alloc
Frando Oct 17, 2023
a45afa0
chore: clippy
Frando Oct 17, 2023
121ca03
Merge remote-tracking branch 'origin/main' into net-less-channels
dignifiedquire Oct 18, 2023
b1aa41a
Merge remote-tracking branch 'origin/main' into net-less-channels
dignifiedquire Oct 18, 2023
776f3f6
log when best addr is cleared, plus a potential correction
divagant-martian Oct 18, 2023
0390b86
refactor(iroh-net): simplify handling of disco message sources (#1665)
dignifiedquire Oct 18, 2023
d2b2a24
feat: improve tracing in iroh-net
Frando Oct 18, 2023
46c4418
fix: clippy and logs
Frando Oct 18, 2023
2ade0c4
fix: log pong receives
Frando Oct 18, 2023
8d0009b
fix: make quinn ignore unmapped destinations
Frando Oct 18, 2023
f9045ef
bench: fix missing debug impls
dignifiedquire Oct 19, 2023
bc83702
fix: double tracing fields
Frando Oct 19, 2023
8fcef6d
refactor: address review comments
Frando Oct 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"iroh-net",
"iroh-sync",
"iroh-test",
"iroh-net/bench"
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ libc = "0.2.139"
num_enum = "0.6.1"
once_cell = "1.17.0"
os_info = "3.6.0"
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8"
rand_core = "0.6.4"
Expand Down
19 changes: 19 additions & 0 deletions iroh-net/bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "bench"
Frando marked this conversation as resolved.
Show resolved Hide resolved
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
anyhow = "1.0.22"
bytes = "1"
hdrhistogram = { version = "7.2", default-features = false }
iroh-net = { path = ".." }
quinn = "0.10"
rcgen = "0.11.1"
rustls = { version = "0.21.0", default-features = false, features = ["quic"] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.0.1", features = ["rt", "sync"] }
tracing = "0.1.10"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
211 changes: 211 additions & 0 deletions iroh-net/bench/src/bin/bulk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
use std::{
sync::{Arc, Mutex},
time::Instant,
};

use anyhow::{Context, Result};
use clap::Parser;
use iroh_net::{MagicEndpoint, PeerAddr};
use tokio::sync::Semaphore;
use tracing::{info, trace};

use bench::{
configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
server_endpoint,
stats::{Stats, TransferResult},
Opt,
};

fn main() {
let opt = Opt::parse();
configure_tracing_subscriber();

let server_span = tracing::error_span!("server");
let runtime = rt();
let (server_addr, endpoint) = {
let _guard = server_span.enter();
server_endpoint(&runtime, &opt)
};

let server_thread = std::thread::spawn(move || {
let _guard = server_span.entered();
if let Err(e) = runtime.block_on(server(endpoint, opt)) {
eprintln!("server failed: {e:#}");
}
});

let mut handles = Vec::new();
for id in 0..opt.clients {
let server_addr = server_addr.clone();
handles.push(std::thread::spawn(move || {
let _guard = tracing::error_span!("client", id).entered();
let runtime = rt();
match runtime.block_on(client(server_addr, opt)) {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("client failed: {e:#}");
Err(e)
}
}
}));
}

for (id, handle) in handles.into_iter().enumerate() {
// We print all stats at the end of the test sequentially to avoid
// them being garbled due to being printed concurrently
if let Ok(stats) = handle.join().expect("client thread") {
stats.print(id);
}
}

server_thread.join().expect("server thread");
}

async fn server(endpoint: MagicEndpoint, opt: Opt) -> Result<()> {
let mut server_tasks = Vec::new();

// Handle only the expected amount of clients
for _ in 0..opt.clients {
let handshake = endpoint.accept().await.unwrap();
let connection = handshake.await.context("handshake failed")?;

server_tasks.push(tokio::spawn(async move {
loop {
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
Err(e) => {
eprintln!("accepting stream failed: {e:?}");
break;
}
Ok(stream) => stream,
};
trace!("stream established");

tokio::spawn(async move {
drain_stream(&mut recv_stream, opt.read_unordered).await?;
send_data_on_stream(&mut send_stream, opt.download_size).await?;
Ok::<_, anyhow::Error>(())
});
}

if opt.stats {
println!("\nServer connection stats:\n{:#?}", connection.stats());
}
}));
}

// Await all the tasks. We have to do this to prevent the runtime getting dropped
// and all server tasks to be cancelled
for handle in server_tasks {
if let Err(e) = handle.await {
eprintln!("Server task error: {e:?}");
};
}

Ok(())
}

async fn client(server_addr: PeerAddr, opt: Opt) -> Result<ClientStats> {
let (endpoint, connection) = connect_client(server_addr, opt).await?;

let start = Instant::now();

let connection = Arc::new(connection);

let mut stats = ClientStats::default();
let mut first_error = None;

let sem = Arc::new(Semaphore::new(opt.max_streams));
let results = Arc::new(Mutex::new(Vec::new()));
for _ in 0..opt.streams {
let permit = sem.clone().acquire_owned().await.unwrap();
let results = results.clone();
let connection = connection.clone();
tokio::spawn(async move {
let result =
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
info!("stream finished: {:?}", result);
results.lock().unwrap().push(result);
drop(permit);
});
}

// Wait for remaining streams to finish
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();

for result in results.lock().unwrap().drain(..) {
match result {
Ok((upload_result, download_result)) => {
stats.upload_stats.stream_finished(upload_result);
stats.download_stats.stream_finished(download_result);
}
Err(e) => {
if first_error.is_none() {
first_error = Some(e);
}
}
}
}

stats.upload_stats.total_duration = start.elapsed();
stats.download_stats.total_duration = start.elapsed();

// Explicit close of the connection, since handles can still be around due
// to `Arc`ing them
connection.close(0u32.into(), b"Benchmark done");

endpoint.close(0u32.into(), b"").await?;

if opt.stats {
println!("\nClient connection stats:\n{:#?}", connection.stats());
}

match first_error {
None => Ok(stats),
Some(e) => Err(e),
}
}

async fn handle_client_stream(
connection: Arc<quinn::Connection>,
upload_size: u64,
read_unordered: bool,
) -> Result<(TransferResult, TransferResult)> {
let start = Instant::now();

let (mut send_stream, mut recv_stream) = connection
.open_bi()
.await
.context("failed to open stream")?;

send_data_on_stream(&mut send_stream, upload_size).await?;

let upload_result = TransferResult::new(start.elapsed(), upload_size);

let start = Instant::now();
let size = drain_stream(&mut recv_stream, read_unordered).await?;
let download_result = TransferResult::new(start.elapsed(), size as u64);

Ok((upload_result, download_result))
}

#[derive(Default)]
struct ClientStats {
upload_stats: Stats,
download_stats: Stats,
}

impl ClientStats {
pub fn print(&self, client_id: usize) {
println!();
println!("Client {client_id} stats:");

if self.upload_stats.total_size != 0 {
self.upload_stats.print("upload");
}

if self.download_stats.total_size != 0 {
self.download_stats.print("download");
}
}
}
Loading
Loading