Skip to content

Commit

Permalink
muxers/mplex: Format with rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 9, 2021
1 parent 22de20f commit 7957253
Show file tree
Hide file tree
Showing 6 changed files with 566 additions and 321 deletions.
53 changes: 30 additions & 23 deletions muxers/mplex/benches/split_send_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
use async_std::task;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use futures::channel::oneshot;
use futures::prelude::*;
use futures::future::poll_fn;
use libp2p_core::{PeerId, Transport, StreamMuxer, identity, upgrade, transport, muxing, multiaddr::multiaddr, Multiaddr};
use futures::prelude::*;
use libp2p_core::{
identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer,
Transport,
};
use libp2p_mplex as mplex;
use libp2p_plaintext::PlainText2Config;
use std::time::Duration;
Expand All @@ -51,14 +54,13 @@ fn prepare(c: &mut Criterion) {
let payload: Vec<u8> = vec![1; 1024 * 1024 * 1];

let mut tcp = c.benchmark_group("tcp");
let tcp_addr = multiaddr![Ip4(std::net::Ipv4Addr::new(127,0,0,1)), Tcp(0u16)];
let tcp_addr = multiaddr![Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)), Tcp(0u16)];
for &size in BENCH_SIZES.iter() {
tcp.throughput(Throughput::Bytes(payload.len() as u64));
let trans = tcp_transport(size);
tcp.bench_function(
format!("{}", size),
|b| b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&tcp_addr)))
);
tcp.bench_function(format!("{}", size), |b| {
b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&tcp_addr)))
});
}
tcp.finish();

Expand All @@ -67,15 +69,13 @@ fn prepare(c: &mut Criterion) {
for &size in BENCH_SIZES.iter() {
mem.throughput(Throughput::Bytes(payload.len() as u64));
let trans = mem_transport(size);
mem.bench_function(
format!("{}", size),
|b| b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&mem_addr)))
);
mem.bench_function(format!("{}", size), |b| {
b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&mem_addr)))
});
}
mem.finish();
}


/// Transfers the given payload between two nodes using the given transport.
fn run(transport: &BenchTransport, payload: &Vec<u8>, listen_addr: &Multiaddr) {
let mut listener = transport.clone().listen_on(listen_addr.clone()).unwrap();
Expand All @@ -101,18 +101,20 @@ fn run(transport: &BenchTransport, payload: &Vec<u8>, listen_addr: &Multiaddr) {
let end = off + std::cmp::min(buf.len() - off, 8 * 1024);
let n = poll_fn(|cx| {
conn.read_substream(cx, &mut s, &mut buf[off..end])
}).await.unwrap();
})
.await
.unwrap();
off += n;
if off == buf.len() {
return
return;
}
}
}
Ok(_) => panic!("Unexpected muxer event"),
Err(e) => panic!("Unexpected error: {:?}", e)
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
_ => panic!("Unexpected listener event")
_ => panic!("Unexpected listener event"),
}
}
});
Expand All @@ -122,16 +124,20 @@ fn run(transport: &BenchTransport, payload: &Vec<u8>, listen_addr: &Multiaddr) {
let addr = addr_receiver.await.unwrap();
let (_peer, conn) = transport.clone().dial(addr).unwrap().await.unwrap();
let mut handle = conn.open_outbound();
let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)).await.unwrap();
let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle))
.await
.unwrap();
let mut off = 0;
loop {
let n = poll_fn(|cx| {
conn.write_substream(cx, &mut stream, &payload[off..])
}).await.unwrap();
let n = poll_fn(|cx| conn.write_substream(cx, &mut stream, &payload[off..]))
.await
.unwrap();
off += n;
if off == payload.len() {
poll_fn(|cx| conn.flush_substream(cx, &mut stream)).await.unwrap();
return
poll_fn(|cx| conn.flush_substream(cx, &mut stream))
.await
.unwrap();
return;
}
}
});
Expand All @@ -147,7 +153,8 @@ fn tcp_transport(split_send_size: usize) -> BenchTransport {
let mut mplex = mplex::MplexConfig::default();
mplex.set_split_send_size(split_send_size);

libp2p_tcp::TcpConfig::new().nodelay(true)
libp2p_tcp::TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(PlainText2Config { local_public_key })
.multiplex(mplex)
Expand Down
Loading

0 comments on commit 7957253

Please sign in to comment.