From 543485efcce3f9b7ee8b844f73e21010d0d730b4 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 22 Nov 2023 18:53:19 +0100 Subject: [PATCH 01/23] feat: dynamic stream window --- Cargo.lock | 4 ++-- Cargo.toml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a7655a3004..eac058c31f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6761,10 +6761,10 @@ dependencies = [ [[package]] name = "yamux" version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0329ef377816896f014435162bb3711ea7a07729c23d0960e6f8048b21b8fe91" +source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#95ab1d340ba4cc5c86a13052f5335d78f0d4300f" dependencies = [ "futures", + "futures-timer", "log", "nohash-hasher", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 0603a22629b..affb572c38a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ unsigned-varint = { version = "0.8.0" } # we import via `rust-multiaddr`. # This is expected to stay here until we move `libp2p-identity` to a separate repository which makes the dependency relationship more obvious. libp2p-identity = { path = "identity" } +yamux = { git = "https://github.com/mxinden/rust-yamux", branch = "dynamic-stream-window" } [workspace.lints] rust.unreachable_pub = "warn" From 992b98dca75b3ed4d7af5b4896aed287a3bde048 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 22 Nov 2023 19:13:27 +0100 Subject: [PATCH 02/23] update yamux --- Cargo.lock | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eac058c31f2..d2552ae3e44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6761,10 +6761,9 @@ dependencies = [ [[package]] name = "yamux" version = "0.12.0" -source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#95ab1d340ba4cc5c86a13052f5335d78f0d4300f" +source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#1cabcecce3bfc36e6795cad435f926e661a73ee8" dependencies = [ "futures", - "futures-timer", "log", "nohash-hasher", "parking_lot", From 4ad240f7e64643075904757d6f20a121cffe6bf9 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 11:01:17 +0100 Subject: [PATCH 03/23] Update yamux including connection limit --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index d2552ae3e44..18ab0283b01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6761,7 +6761,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.12.0" -source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#1cabcecce3bfc36e6795cad435f926e661a73ee8" +source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#97068127f7460db20ea844d746236f4c4a2909fa" dependencies = [ "futures", "log", From 86569161fc18d7a9ad04addf5acc892d700df04c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 11:31:37 +0100 Subject: [PATCH 04/23] fix usize --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index d10cdfa244c..3d561e2c873 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -274,7 +274,7 @@ impl Config { } /// Sets the size (in bytes) of the receive window per substream. - pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self { + pub fn set_receive_window_size(&mut self, num_bytes: usize) -> &mut Self { self.inner.set_receive_window(num_bytes); self } From 7781fde5302efb3875982260c97ee4511cc5f801 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 12:17:15 +0100 Subject: [PATCH 05/23] improve logging --- Cargo.lock | 2 +- protocols/perf/src/bin/perf.rs | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18ab0283b01..d7af248c783 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6761,7 +6761,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.12.0" -source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#97068127f7460db20ea844d746236f4c4a2909fa" +source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#e6200e79be53b7426b6fffdbedd6b349fca95c96" dependencies = [ "futures", "log", diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 9ac8f0a6cde..e678fd12e53 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -71,9 +71,12 @@ impl FromStr for Transport { #[tokio::main] async fn main() -> Result<()> { - let _ = tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .try_init(); + let _ = + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or( + "info,yamux=debug".into(), + )) + .try_init(); let opts = Opts::parse(); match opts { From 3169acb119a62d80c2b2e78e12c5eb7b09a3de29 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 14:10:15 +0100 Subject: [PATCH 06/23] write logs to stderr --- protocols/perf/src/bin/perf.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index e678fd12e53..9f72c949b4c 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -76,6 +76,7 @@ async fn main() -> Result<()> { .with_env_filter(EnvFilter::try_from_default_env().unwrap_or( "info,yamux=debug".into(), )) + .with_writer(std::io::stderr) .try_init(); let opts = Opts::parse(); From bae5432c6f80ae68fcb3b675c89e7e5595c17665 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Nov 2023 15:33:36 +0100 Subject: [PATCH 07/23] Unbounded stream receive window Depend on connection window limit --- Cargo.lock | 2 +- Cargo.toml | 2 +- muxers/yamux/src/lib.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7af248c783..e0949f4bcee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6761,7 +6761,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.12.0" -source = "git+https://github.com/mxinden/rust-yamux?branch=dynamic-stream-window#e6200e79be53b7426b6fffdbedd6b349fca95c96" +source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#ca14cec55c93ac61b2c2f73ed0815ee617f0e89b" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index affb572c38a..7499b9480df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ unsigned-varint = { version = "0.8.0" } # we import via `rust-multiaddr`. # This is expected to stay here until we move `libp2p-identity` to a separate repository which makes the dependency relationship more obvious. libp2p-identity = { path = "identity" } -yamux = { git = "https://github.com/mxinden/rust-yamux", branch = "dynamic-stream-window" } +yamux = { git = "https://github.com/libp2p/rust-yamux", branch = "dynamic-stream-window" } [workspace.lints] rust.unreachable_pub = "warn" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 3d561e2c873..2c6d7c9788d 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -274,7 +274,7 @@ impl Config { } /// Sets the size (in bytes) of the receive window per substream. - pub fn set_receive_window_size(&mut self, num_bytes: usize) -> &mut Self { + pub fn set_receive_window_size(&mut self, num_bytes: Option) -> &mut Self { self.inner.set_receive_window(num_bytes); self } From eff5d8acf9eef806f95800d5f987b14ccb4564b6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 25 Nov 2023 12:00:49 +0100 Subject: [PATCH 08/23] Update yamux --- Cargo.lock | 4 +-- muxers/yamux/Cargo.toml | 2 +- muxers/yamux/src/lib.rs | 60 +++++------------------------------------ 3 files changed, 10 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0949f4bcee..6238256dd7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6760,8 +6760,8 @@ dependencies = [ [[package]] name = "yamux" -version = "0.12.0" -source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#ca14cec55c93ac61b2c2f73ed0815ee617f0e89b" +version = "0.13.0" +source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#68d213297f962c58c9d2f4571837c6bbd9eb07d0" dependencies = [ "futures", "log", diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index ec3d4b85c5b..836593333b4 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.29" libp2p-core = { workspace = true } thiserror = "1.0" -yamux = "0.12" +yamux = "0.13" tracing = "0.1.37" [dev-dependencies] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2c6d7c9788d..11a87ff79d7 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -214,46 +214,6 @@ pub struct Config { mode: Option, } -/// The window update mode determines when window updates are -/// sent to the remote, giving it new credit to send more data. -pub struct WindowUpdateMode(yamux::WindowUpdateMode); - -impl WindowUpdateMode { - /// The window update mode whereby the remote is given - /// new credit via a window update whenever the current - /// receive window is exhausted when data is received, - /// i.e. this mode cannot exert back-pressure from application - /// code that is slow to read from a substream. - /// - /// > **Note**: The receive buffer may overflow with this - /// > strategy if the receiver is too slow in reading the - /// > data from the buffer. The maximum receive buffer - /// > size must be tuned appropriately for the desired - /// > throughput and level of tolerance for (temporarily) - /// > slow receivers. - pub fn on_receive() -> Self { - WindowUpdateMode(yamux::WindowUpdateMode::OnReceive) - } - - /// The window update mode whereby the remote is given new - /// credit only when the current receive window is exhausted - /// when data is read from the substream's receive buffer, - /// i.e. application code that is slow to read from a substream - /// exerts back-pressure on the remote. - /// - /// > **Note**: If the receive window of a substream on - /// > both peers is exhausted and both peers are blocked on - /// > sending data before reading from the stream, a deadlock - /// > occurs. To avoid this situation, reading from a substream - /// > should never be blocked on writing to the same substream. - /// - /// > **Note**: With this strategy, there is usually no point in the - /// > receive buffer being larger than the window size. - pub fn on_read() -> Self { - WindowUpdateMode(yamux::WindowUpdateMode::OnRead) - } -} - impl Config { /// Creates a new `YamuxConfig` in client mode, regardless of whether /// it will be used for an inbound or outbound upgrade. @@ -273,15 +233,16 @@ impl Config { } } - /// Sets the size (in bytes) of the receive window per substream. - pub fn set_receive_window_size(&mut self, num_bytes: Option) -> &mut Self { - self.inner.set_receive_window(num_bytes); + /// Sets the size (in bytes) of the receive window per stream. + pub fn set_max_stream_receive_window(&mut self, num_bytes: Option) -> &mut Self { + self.inner.set_max_stream_receive_window(num_bytes); self } - /// Sets the maximum size (in bytes) of the receive buffer per substream. - pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self { - self.inner.set_max_buffer_size(num_bytes); + /// Sets the size (in bytes) of the receive window per connection. + pub fn set_max_connection_receive_window(&mut self, num_bytes: Option) -> &mut Self { + self.inner + .set_max_connection_receive_window(num_bytes.expect("todo")); self } @@ -290,13 +251,6 @@ impl Config { self.inner.set_max_num_streams(num_streams); self } - - /// Sets the window update mode that determines when the remote - /// is given new credit for sending more data. - pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { - self.inner.set_window_update_mode(mode.0); - self - } } impl Default for Config { From 75d9844b2f51ea2a4a77a1cc102fc23996c8391a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 1 Dec 2023 14:17:11 +0100 Subject: [PATCH 09/23] Support both v012 and v013 --- Cargo.lock | 19 ++- muxers/yamux/Cargo.toml | 4 +- muxers/yamux/src/lib.rs | 278 ++++++++++++++++++++++++++++++++-------- 3 files changed, 244 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6238256dd7a..acc2f0fa50c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3363,12 +3363,14 @@ name = "libp2p-yamux" version = "0.45.0" dependencies = [ "async-std", + "either", "futures", "libp2p-core", "libp2p-muxer-test-harness", "thiserror", "tracing", - "yamux", + "yamux 0.12.1", + "yamux 0.13.0", ] [[package]] @@ -6758,6 +6760,21 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "yamux" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed0164ae619f2dc144909a9f082187ebb5893693d8c0196e8085283ccd4b776" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot", + "pin-project", + "rand 0.8.5", + "static_assertions", +] + [[package]] name = "yamux" version = "0.13.0" diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 836593333b4..2ffbf69e2ca 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -11,10 +11,12 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +either = "1" futures = "0.3.29" libp2p-core = { workspace = true } thiserror = "1.0" -yamux = "0.13" +yamux012 = { version = "0.12.1", package = "yamux" } +yamux013 = { version = "0.13", package = "yamux" } tracing = "0.1.37" [dev-dependencies] diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 11a87ff79d7..c644ab9a556 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,6 +22,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +use either::Either; use futures::{future, prelude::*, ready}; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo}; @@ -34,12 +35,11 @@ use std::{ task::{Context, Poll}, }; use thiserror::Error; -use yamux::ConnectionError; /// A Yamux connection. #[derive(Debug)] pub struct Muxer { - connection: yamux::Connection, + connection: Either, yamux013::Connection>, /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// /// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the @@ -65,9 +65,9 @@ where C: AsyncRead + AsyncWrite + Send + Unpin + 'static, { /// Create a new Yamux connection. - fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { + fn new(connection: Either, yamux013::Connection>) -> Self { Muxer { - connection: yamux::Connection::new(io, cfg, mode), + connection, inbound_stream_buffer: VecDeque::default(), inbound_stream_waker: None, } @@ -103,14 +103,24 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let stream = ready!(self.connection.poll_new_outbound(cx).map_err(Error)?); - - Poll::Ready(Ok(Stream(stream))) + let stream = ready!(either::for_both!( + self.connection.as_mut(), + c => c + .poll_new_outbound(cx) + .map_err(Error::from) + .map_ok(Stream::from) + )?); + Poll::Ready(Ok(stream)) } #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_close", skip(self, cx))] fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.connection.poll_close(cx).map_err(Error)?); + ready!(either::for_both!( + self.connection.as_mut(), + c => c + .poll_close(cx) + .map_err(Error::from) + )?); Poll::Ready(Ok(())) } @@ -146,7 +156,19 @@ where /// A stream produced by the yamux multiplexer. #[derive(Debug)] -pub struct Stream(yamux::Stream); +pub struct Stream(Either); + +impl From for Stream { + fn from(value: yamux012::Stream) -> Self { + Self(Either::Left(value)) + } +} + +impl From for Stream { + fn from(value: yamux013::Stream) -> Self { + Self(Either::Right(value)) + } +} impl AsyncRead for Stream { fn poll_read( @@ -154,7 +176,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read(cx, buf)) } fn poll_read_vectored( @@ -162,7 +184,7 @@ impl AsyncRead for Stream { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.0).poll_read_vectored(cx, bufs) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_read_vectored(cx, bufs)) } } @@ -172,7 +194,7 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write(cx, buf)) } fn poll_write_vectored( @@ -180,15 +202,15 @@ impl AsyncWrite for Stream { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.0).poll_write_vectored(cx, bufs) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_write_vectored(cx, bufs)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_flush(cx)) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_close(cx) + either::for_both!(self.0.as_mut(), s => Pin::new(s).poll_close(cx)) } } @@ -197,69 +219,87 @@ where C: AsyncRead + AsyncWrite + Unpin + 'static, { fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { - let stream = ready!(self.connection.poll_next_inbound(cx)) - .transpose() - .map_err(Error)? - .map(Stream) - .ok_or(Error(ConnectionError::Closed))?; + let stream = match self.connection.as_mut() { + Either::Left(c) => ready!(c.poll_next_inbound(cx)) + .ok_or(Error::from(yamux012::ConnectionError::Closed))? + .map_err(Error::from) + .map(Stream::from)?, + Either::Right(c) => ready!(c.poll_next_inbound(cx)) + .ok_or(Error::from(yamux013::ConnectionError::Closed))? + .map_err(Error::from) + .map(Stream::from)?, + }; Poll::Ready(Ok(stream)) } } /// The yamux configuration. -#[derive(Debug, Clone)] -pub struct Config { - inner: yamux::Config, - mode: Option, +pub struct Config(Either); + +impl Default for Config { + fn default() -> Self { + Self(Either::Right(Config013::default())) + } } impl Config { + // TODO: deprecate /// Creates a new `YamuxConfig` in client mode, regardless of whether /// it will be used for an inbound or outbound upgrade. pub fn client() -> Self { - Self { - mode: Some(yamux::Mode::Client), + Self(Either::Left(Config012 { + mode: Some(yamux012::Mode::Client), ..Default::default() - } + })) } + // TODO: deprecate /// Creates a new `YamuxConfig` in server mode, regardless of whether /// it will be used for an inbound or outbound upgrade. pub fn server() -> Self { - Self { - mode: Some(yamux::Mode::Server), + Self(Either::Left(Config012 { + mode: Some(yamux012::Mode::Server), ..Default::default() - } + })) } - /// Sets the size (in bytes) of the receive window per stream. - pub fn set_max_stream_receive_window(&mut self, num_bytes: Option) -> &mut Self { - self.inner.set_max_stream_receive_window(num_bytes); + fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self { + let mut cfg012 = match self.0.as_mut() { + Either::Left(c) => &mut c.inner, + Either::Right(_) => { + self.0 = Either::Left(Config012::default()); + &mut self.0.as_mut().unwrap_left().inner + } + }; + + f(&mut cfg012); + self } - /// Sets the size (in bytes) of the receive window per connection. - pub fn set_max_connection_receive_window(&mut self, num_bytes: Option) -> &mut Self { - self.inner - .set_max_connection_receive_window(num_bytes.expect("todo")); - self + // TODO: deprecate + /// Sets the size (in bytes) of the receive w``indow per substream. + pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self { + self.set(|cfg| cfg.set_receive_window(num_bytes)) + } + + // TODO: deprecate + /// Sets the maximum size (in bytes) of the receive buffer per substream. + pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self { + self.set(|cfg| cfg.set_max_buffer_size(num_bytes)) } /// Sets the maximum number of concurrent substreams. pub fn set_max_num_streams(&mut self, num_streams: usize) -> &mut Self { - self.inner.set_max_num_streams(num_streams); - self + self.set(|cfg| cfg.set_max_num_streams(num_streams)) } -} -impl Default for Config { - fn default() -> Self { - let mut inner = yamux::Config::default(); - // For conformity with mplex, read-after-close on a multiplexed - // connection is never permitted and not configurable. - inner.set_read_after_close(false); - Config { inner, mode: None } + // TODO: deprecate + /// Sets the window update mode that determines when the remote + /// is given new credit for sending more data. + pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { + self.set(|cfg| cfg.set_window_update_mode(mode.0)) } } @@ -281,8 +321,18 @@ where type Future = future::Ready>; fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Server); - future::ready(Ok(Muxer::new(io, self.inner, mode))) + let connection = match self.0 { + Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new( + io, + inner, + mode.unwrap_or(yamux012::Mode::Server), + )), + Either::Right(Config013(cfg)) => { + Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Server)) + } + }; + + future::ready(Ok(Muxer::new(connection))) } } @@ -295,21 +345,139 @@ where type Future = future::Ready>; fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - let mode = self.mode.unwrap_or(yamux::Mode::Client); - future::ready(Ok(Muxer::new(io, self.inner, mode))) + let connection = match self.0 { + Either::Left(Config012 { inner, mode }) => Either::Left(yamux012::Connection::new( + io, + inner, + mode.unwrap_or(yamux012::Mode::Client), + )), + Either::Right(Config013(cfg)) => { + Either::Right(yamux013::Connection::new(io, cfg, yamux013::Mode::Client)) + } + }; + + future::ready(Ok(Muxer::new(connection))) + } +} + +#[derive(Debug, Clone)] +struct Config012 { + inner: yamux012::Config, + mode: Option, +} + +impl Default for Config012 { + fn default() -> Self { + let mut inner = yamux012::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + inner.set_read_after_close(false); + Self { inner, mode: None } + } +} + +/// The window update mode determines when window updates are +/// sent to the remote, giving it new credit to send more data. +pub struct WindowUpdateMode(yamux012::WindowUpdateMode); + +impl WindowUpdateMode { + /// The window update mode whereby the remote is given + /// new credit via a window update whenever the current + /// receive window is exhausted when data is received, + /// i.e. this mode cannot exert back-pressure from application + /// code that is slow to read from a substream. + /// + /// > **Note**: The receive buffer may overflow with this + /// > strategy if the receiver is too slow in reading the + /// > data from the buffer. The maximum receive buffer + /// > size must be tuned appropriately for the desired + /// > throughput and level of tolerance for (temporarily) + /// > slow receivers. + #[deprecated(note = "Use `WindowUpdateMode::on_read` instead.")] + pub fn on_receive() -> Self { + #[allow(deprecated)] + WindowUpdateMode(yamux012::WindowUpdateMode::OnReceive) + } + + /// The window update mode whereby the remote is given new + /// credit only when the current receive window is exhausted + /// when data is read from the substream's receive buffer, + /// i.e. application code that is slow to read from a substream + /// exerts back-pressure on the remote. + /// + /// > **Note**: If the receive window of a substream on + /// > both peers is exhausted and both peers are blocked on + /// > sending data before reading from the stream, a deadlock + /// > occurs. To avoid this situation, reading from a substream + /// > should never be blocked on writing to the same substream. + /// + /// > **Note**: With this strategy, there is usually no point in the + /// > receive buffer being larger than the window size. + pub fn on_read() -> Self { + WindowUpdateMode(yamux012::WindowUpdateMode::OnRead) + } +} + +#[derive(Debug, Clone)] +struct Config013(yamux013::Config); + +impl Default for Config013 { + fn default() -> Self { + let mut cfg = yamux013::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + cfg.set_read_after_close(false); + Self(cfg) } } /// The Yamux [`StreamMuxer`] error type. #[derive(Debug, Error)] #[error(transparent)] -pub struct Error(yamux::ConnectionError); +pub struct Error(Either); + +impl From for Error { + fn from(value: yamux012::ConnectionError) -> Self { + Error(Either::Left(value)) + } +} + +impl From for Error { + fn from(value: yamux013::ConnectionError) -> Self { + Error(Either::Right(value)) + } +} impl From for io::Error { fn from(err: Error) -> Self { match err.0 { - yamux::ConnectionError::Io(e) => e, - e => io::Error::new(io::ErrorKind::Other, e), + Either::Left(err) => match err { + yamux012::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), + }, + Either::Right(err) => match err { + yamux013::ConnectionError::Io(e) => e, + e => io::Error::new(io::ErrorKind::Other, e), + }, } } } + +#[cfg(test)] +mod test { + use super::*; + #[test] + fn config_set_switches_to_v012() { + // By default we use yamux v0.13. Thus we provide the benefits of yamux v0.13 to all users + // that do not depend on any of the behaviors (i.e. configuration options) of v0.12. + let mut cfg = Config::default(); + assert!(matches!( + cfg, + Config(Either::Right(Config013(yamux013::Config { .. }))) + )); + + // In case a user makes any configurations, use yamux v0.12 instead. + cfg.set_max_num_streams(42); + assert!(matches!(cfg, Config(Either::Left(Config012 { .. })))); + } +} From 79afb85856e3e64b0d994fb50c3ead0024df917c Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 1 Dec 2023 18:13:37 +0100 Subject: [PATCH 10/23] reduce diff --- muxers/yamux/src/lib.rs | 116 ++++++++++++++++++++-------------------- 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index c644ab9a556..e9f30a90c1c 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -243,6 +243,64 @@ impl Default for Config { } } +#[derive(Debug, Clone)] +struct Config012 { + inner: yamux012::Config, + mode: Option, +} + +impl Default for Config012 { + fn default() -> Self { + let mut inner = yamux012::Config::default(); + // For conformity with mplex, read-after-close on a multiplexed + // connection is never permitted and not configurable. + inner.set_read_after_close(false); + Self { inner, mode: None } + } +} + +/// The window update mode determines when window updates are +/// sent to the remote, giving it new credit to send more data. +pub struct WindowUpdateMode(yamux012::WindowUpdateMode); + +impl WindowUpdateMode { + /// The window update mode whereby the remote is given + /// new credit via a window update whenever the current + /// receive window is exhausted when data is received, + /// i.e. this mode cannot exert back-pressure from application + /// code that is slow to read from a substream. + /// + /// > **Note**: The receive buffer may overflow with this + /// > strategy if the receiver is too slow in reading the + /// > data from the buffer. The maximum receive buffer + /// > size must be tuned appropriately for the desired + /// > throughput and level of tolerance for (temporarily) + /// > slow receivers. + #[deprecated(note = "Use `WindowUpdateMode::on_read` instead.")] + pub fn on_receive() -> Self { + #[allow(deprecated)] + WindowUpdateMode(yamux012::WindowUpdateMode::OnReceive) + } + + /// The window update mode whereby the remote is given new + /// credit only when the current receive window is exhausted + /// when data is read from the substream's receive buffer, + /// i.e. application code that is slow to read from a substream + /// exerts back-pressure on the remote. + /// + /// > **Note**: If the receive window of a substream on + /// > both peers is exhausted and both peers are blocked on + /// > sending data before reading from the stream, a deadlock + /// > occurs. To avoid this situation, reading from a substream + /// > should never be blocked on writing to the same substream. + /// + /// > **Note**: With this strategy, there is usually no point in the + /// > receive buffer being larger than the window size. + pub fn on_read() -> Self { + WindowUpdateMode(yamux012::WindowUpdateMode::OnRead) + } +} + impl Config { // TODO: deprecate /// Creates a new `YamuxConfig` in client mode, regardless of whether @@ -360,64 +418,6 @@ where } } -#[derive(Debug, Clone)] -struct Config012 { - inner: yamux012::Config, - mode: Option, -} - -impl Default for Config012 { - fn default() -> Self { - let mut inner = yamux012::Config::default(); - // For conformity with mplex, read-after-close on a multiplexed - // connection is never permitted and not configurable. - inner.set_read_after_close(false); - Self { inner, mode: None } - } -} - -/// The window update mode determines when window updates are -/// sent to the remote, giving it new credit to send more data. -pub struct WindowUpdateMode(yamux012::WindowUpdateMode); - -impl WindowUpdateMode { - /// The window update mode whereby the remote is given - /// new credit via a window update whenever the current - /// receive window is exhausted when data is received, - /// i.e. this mode cannot exert back-pressure from application - /// code that is slow to read from a substream. - /// - /// > **Note**: The receive buffer may overflow with this - /// > strategy if the receiver is too slow in reading the - /// > data from the buffer. The maximum receive buffer - /// > size must be tuned appropriately for the desired - /// > throughput and level of tolerance for (temporarily) - /// > slow receivers. - #[deprecated(note = "Use `WindowUpdateMode::on_read` instead.")] - pub fn on_receive() -> Self { - #[allow(deprecated)] - WindowUpdateMode(yamux012::WindowUpdateMode::OnReceive) - } - - /// The window update mode whereby the remote is given new - /// credit only when the current receive window is exhausted - /// when data is read from the substream's receive buffer, - /// i.e. application code that is slow to read from a substream - /// exerts back-pressure on the remote. - /// - /// > **Note**: If the receive window of a substream on - /// > both peers is exhausted and both peers are blocked on - /// > sending data before reading from the stream, a deadlock - /// > occurs. To avoid this situation, reading from a substream - /// > should never be blocked on writing to the same substream. - /// - /// > **Note**: With this strategy, there is usually no point in the - /// > receive buffer being larger than the window size. - pub fn on_read() -> Self { - WindowUpdateMode(yamux012::WindowUpdateMode::OnRead) - } -} - #[derive(Debug, Clone)] struct Config013(yamux013::Config); From 5bc7112329af865d35d8f3389d679093cd560a67 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 16:40:34 +0100 Subject: [PATCH 11/23] Remove From impls --- muxers/yamux/src/lib.rs | 63 ++++++++++++----------------------------- 1 file changed, 18 insertions(+), 45 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index e9f30a90c1c..38493f629e1 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -103,26 +103,23 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let stream = ready!(either::for_both!( - self.connection.as_mut(), - c => c - .poll_new_outbound(cx) - .map_err(Error::from) - .map_ok(Stream::from) - )?); + let stream = match self.connection.as_mut() { + Either::Left(c) => ready!(c.poll_new_outbound(cx)) + .map_err(|e| Error(Either::Left(e))) + .map(|s| Stream(Either::Left(s))), + Either::Right(c) => ready!(c.poll_new_outbound(cx)) + .map_err(|e| Error(Either::Right(e))) + .map(|s| Stream(Either::Right(s))), + }?; Poll::Ready(Ok(stream)) } #[tracing::instrument(level = "trace", name = "StreamMuxer::poll_close", skip(self, cx))] fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(either::for_both!( - self.connection.as_mut(), - c => c - .poll_close(cx) - .map_err(Error::from) - )?); - - Poll::Ready(Ok(())) + match self.connection.as_mut() { + Either::Left(c) => c.poll_close(cx).map_err(|e| Error(Either::Left(e))), + Either::Right(c) => c.poll_close(cx).map_err(|e| Error(Either::Right(e))), + } } #[tracing::instrument(level = "trace", name = "StreamMuxer::poll", skip(self, cx))] @@ -158,18 +155,6 @@ where #[derive(Debug)] pub struct Stream(Either); -impl From for Stream { - fn from(value: yamux012::Stream) -> Self { - Self(Either::Left(value)) - } -} - -impl From for Stream { - fn from(value: yamux013::Stream) -> Self { - Self(Either::Right(value)) - } -} - impl AsyncRead for Stream { fn poll_read( mut self: Pin<&mut Self>, @@ -221,13 +206,13 @@ where fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { let stream = match self.connection.as_mut() { Either::Left(c) => ready!(c.poll_next_inbound(cx)) - .ok_or(Error::from(yamux012::ConnectionError::Closed))? - .map_err(Error::from) - .map(Stream::from)?, + .ok_or(Error(Either::Left(yamux012::ConnectionError::Closed)))? + .map_err(|e| Error(Either::Left(e))) + .map(|s| Stream(Either::Left(s)))?, Either::Right(c) => ready!(c.poll_next_inbound(cx)) - .ok_or(Error::from(yamux013::ConnectionError::Closed))? - .map_err(Error::from) - .map(Stream::from)?, + .ok_or(Error(Either::Right(yamux013::ConnectionError::Closed)))? + .map_err(|e| Error(Either::Right(e))) + .map(|s| Stream(Either::Right(s)))?, }; Poll::Ready(Ok(stream)) @@ -436,18 +421,6 @@ impl Default for Config013 { #[error(transparent)] pub struct Error(Either); -impl From for Error { - fn from(value: yamux012::ConnectionError) -> Self { - Error(Either::Left(value)) - } -} - -impl From for Error { - fn from(value: yamux013::ConnectionError) -> Self { - Error(Either::Right(value)) - } -} - impl From for io::Error { fn from(err: Error) -> Self { match err.0 { From 5f64814947453b10c060ce66e02d4e43d2c1f217 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:18:48 +0100 Subject: [PATCH 12/23] Debug and Clone for Config --- muxers/yamux/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 38493f629e1..2b95645ec4a 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -220,6 +220,7 @@ where } /// The yamux configuration. +#[derive(Debug, Clone)] pub struct Config(Either); impl Default for Config { From 651c0fc5cc82a9038f8fb1a39341b5f826876ad8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:23:17 +0100 Subject: [PATCH 13/23] Deprecate v0.12 methods --- muxers/yamux/src/lib.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2b95645ec4a..2f43ad453c2 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -288,9 +288,9 @@ impl WindowUpdateMode { } impl Config { - // TODO: deprecate /// Creates a new `YamuxConfig` in client mode, regardless of whether /// it will be used for an inbound or outbound upgrade. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn client() -> Self { Self(Either::Left(Config012 { mode: Some(yamux012::Mode::Client), @@ -298,9 +298,9 @@ impl Config { })) } - // TODO: deprecate /// Creates a new `YamuxConfig` in server mode, regardless of whether /// it will be used for an inbound or outbound upgrade. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn server() -> Self { Self(Either::Left(Config012 { mode: Some(yamux012::Mode::Server), @@ -322,14 +322,16 @@ impl Config { self } - // TODO: deprecate /// Sets the size (in bytes) of the receive w``indow per substream. + #[deprecated( + note = "Will be replaced in the next breaking release with a connection receive window size limit." + )] pub fn set_receive_window_size(&mut self, num_bytes: u32) -> &mut Self { self.set(|cfg| cfg.set_receive_window(num_bytes)) } - // TODO: deprecate /// Sets the maximum size (in bytes) of the receive buffer per substream. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn set_max_buffer_size(&mut self, num_bytes: usize) -> &mut Self { self.set(|cfg| cfg.set_max_buffer_size(num_bytes)) } @@ -339,9 +341,9 @@ impl Config { self.set(|cfg| cfg.set_max_num_streams(num_streams)) } - // TODO: deprecate /// Sets the window update mode that determines when the remote /// is given new credit for sending more data. + #[deprecated(note = "Will be removed with the next breaking release.")] pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { self.set(|cfg| cfg.set_window_update_mode(mode.0)) } From aafbda74cbfd25fd5315dfb4cce4d0e8887b96ae Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:27:07 +0100 Subject: [PATCH 14/23] Add changelog entry --- muxers/yamux/CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index c8534166ea6..de608b195f8 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -4,6 +4,12 @@ It does not enforce flow-control, i.e. breaks backpressure. Use `WindowUpdateMode::on_read` instead. See `yamux` crate version `v0.12.1` and [Yamux PR #177](https://github.com/libp2p/rust-yamux/pull/177). +- `yamux` `v0.13` enables auto-tuning for the Yamux stream receive window. + While preserving small buffers on low-latency and/or low-bandwidth connections, this change allows for high-latency and/or high-bandwidth connections to exhaust the available bandwidth on a single stream. + Have `libp2p-yamux` use `yamux` `v0.13` (new version) by default and fall back to `yamux` `v0.12` (old version) when setting any configuration options. + Thus default users benefit from the increased performance, while power users with custom configurations maintain the old behavior. + `libp2p-yamux` will switch over to `yamux` `v0.13` entirely with the next breaking release. + See [PR 4970](https://github.com/libp2p/rust-libp2p/pull/4970). ## 0.45.0 From 309a3d3a4a4a1e20002966bca4128457b764b1e8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:32:23 +0100 Subject: [PATCH 15/23] Update yamux --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 51b536f18a1..a7e459fa1c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6867,7 +6867,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.0" -source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#b9b74db02335ee62660688db406eeaff2ea171d1" +source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#6b64a9fad14b6744e817e0fbc92f2c6126461f71" dependencies = [ "futures", "log", From 13ac679d393c1faabc92d59215d435796d974235 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:35:55 +0100 Subject: [PATCH 16/23] fmt --- protocols/perf/src/bin/perf.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 9f72c949b4c..117b1665d93 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -71,13 +71,10 @@ impl FromStr for Transport { #[tokio::main] async fn main() -> Result<()> { - let _ = - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or( - "info,yamux=debug".into(), - )) - .with_writer(std::io::stderr) - .try_init(); + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or("info,yamux=debug".into())) + .with_writer(std::io::stderr) + .try_init(); let opts = Opts::parse(); match opts { From 73a3f0761539f2def106846d1c1b8777d7abaf81 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:36:36 +0100 Subject: [PATCH 17/23] Fix doc link --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2f43ad453c2..213bee51027 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -42,7 +42,7 @@ pub struct Muxer { connection: Either, yamux013::Connection>, /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. /// - /// The only way how yamux can make progress is by calling [`yamux::Connection::poll_next_inbound`]. However, the + /// The only way how yamux can make progress is by calling [`yamux013::Connection::poll_next_inbound`]. However, the /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. From d5029cb57a414d78b7eb5fd6b9888cb86129737d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:37:46 +0100 Subject: [PATCH 18/23] clippy --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 213bee51027..b93de31c35b 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -317,7 +317,7 @@ impl Config { } }; - f(&mut cfg012); + f(cfg012); self } From a59a3bf70890eb06df83b90c1de970999d5db5d2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 5 Dec 2023 18:54:08 +0100 Subject: [PATCH 19/23] Remove mut --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index b93de31c35b..2dcd47de254 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -309,7 +309,7 @@ impl Config { } fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self { - let mut cfg012 = match self.0.as_mut() { + let cfg012 = match self.0.as_mut() { Either::Left(c) => &mut c.inner, Either::Right(_) => { self.0 = Either::Left(Config012::default()); From 2aef7131bdfe9ca8b1e41b2ed6c69f42e262de12 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 6 Dec 2023 13:18:36 +0100 Subject: [PATCH 20/23] Address review feedback --- muxers/yamux/src/lib.rs | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 2dcd47de254..2b5eb52a11e 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -308,21 +308,7 @@ impl Config { })) } - fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self { - let cfg012 = match self.0.as_mut() { - Either::Left(c) => &mut c.inner, - Either::Right(_) => { - self.0 = Either::Left(Config012::default()); - &mut self.0.as_mut().unwrap_left().inner - } - }; - - f(cfg012); - - self - } - - /// Sets the size (in bytes) of the receive w``indow per substream. + /// Sets the size (in bytes) of the receive window per substream. #[deprecated( note = "Will be replaced in the next breaking release with a connection receive window size limit." )] @@ -343,10 +329,26 @@ impl Config { /// Sets the window update mode that determines when the remote /// is given new credit for sending more data. - #[deprecated(note = "Will be removed with the next breaking release.")] + #[deprecated( + note = "`WindowUpdate::OnRead` is the default. `WindowUpdate::OnReceive` breaks backpressure, is thus not recommended, and will be removed in the next breaking release. Thus this method becomes obsolete and will be removed with the next breaking release." + )] pub fn set_window_update_mode(&mut self, mode: WindowUpdateMode) -> &mut Self { self.set(|cfg| cfg.set_window_update_mode(mode.0)) } + + fn set(&mut self, f: impl FnOnce(&mut yamux012::Config) -> &mut yamux012::Config) -> &mut Self { + let cfg012 = match self.0.as_mut() { + Either::Left(c) => &mut c.inner, + Either::Right(_) => { + self.0 = Either::Left(Config012::default()); + &mut self.0.as_mut().unwrap_left().inner + } + }; + + f(cfg012); + + self + } } impl UpgradeInfo for Config { From 3e3c30b2f60ff128e6b3c9dceb67c2918a3e08c2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 6 Dec 2023 13:32:28 +0100 Subject: [PATCH 21/23] Update to released yamux --- Cargo.lock | 3 ++- Cargo.toml | 1 - protocols/perf/src/bin/perf.rs | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7e459fa1c6..0f244b643a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6867,7 +6867,8 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.0" -source = "git+https://github.com/libp2p/rust-yamux?branch=dynamic-stream-window#6b64a9fad14b6744e817e0fbc92f2c6126461f71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b70d98d58bf69f92db870a8ce039c0d39af78a3405ff1d28c58ec406b8eea4" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index 2fec19af9ad..1e243c418eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,6 @@ unsigned-varint = { version = "0.8.0" } # we import via `rust-multiaddr`. # This is expected to stay here until we move `libp2p-identity` to a separate repository which makes the dependency relationship more obvious. libp2p-identity = { path = "identity" } -yamux = { git = "https://github.com/libp2p/rust-yamux", branch = "dynamic-stream-window" } [workspace.lints] rust.unreachable_pub = "warn" diff --git a/protocols/perf/src/bin/perf.rs b/protocols/perf/src/bin/perf.rs index 117b1665d93..9ac8f0a6cde 100644 --- a/protocols/perf/src/bin/perf.rs +++ b/protocols/perf/src/bin/perf.rs @@ -72,8 +72,7 @@ impl FromStr for Transport { #[tokio::main] async fn main() -> Result<()> { let _ = tracing_subscriber::fmt() - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or("info,yamux=debug".into())) - .with_writer(std::io::stderr) + .with_env_filter(EnvFilter::from_default_env()) .try_init(); let opts = Opts::parse(); From 2beb89624840de25c72e7a66ce01877c305bbb0d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 6 Dec 2023 18:55:30 +0100 Subject: [PATCH 22/23] Use instant in yamux --- Cargo.lock | 8 ++++---- Cargo.toml | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66fb871ebb3..b211cd19d6d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3379,7 +3379,7 @@ dependencies = [ "thiserror", "tracing", "yamux 0.12.1", - "yamux 0.13.0", + "yamux 0.13.1", ] [[package]] @@ -6866,11 +6866,11 @@ dependencies = [ [[package]] name = "yamux" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b70d98d58bf69f92db870a8ce039c0d39af78a3405ff1d28c58ec406b8eea4" +version = "0.13.1" +source = "git+https://github.com/mxinden/rust-yamux?branch=wasm-instant#3e8d47fbfb09fa636993325c4f633da5b00494a7" dependencies = [ "futures", + "instant", "log", "nohash-hasher", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 11ac47b0be4..dc4992e1627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,6 +130,7 @@ unsigned-varint = { version = "0.8.0" } # we import via `rust-multiaddr`. # This is expected to stay here until we move `libp2p-identity` to a separate repository which makes the dependency relationship more obvious. libp2p-identity = { path = "identity" } +yamux = { git = "https://github.com/mxinden/rust-yamux", branch = "wasm-instant" } [workspace.lints] rust.unreachable_pub = "warn" From de3b5ffa95bb5d9cf09e7afd361edf14eebfae94 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 6 Dec 2023 19:10:21 +0100 Subject: [PATCH 23/23] Use released yamux v0.13.1 --- Cargo.lock | 3 ++- Cargo.toml | 1 - muxers/yamux/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b211cd19d6d..f510267ca06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6867,7 +6867,8 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.1" -source = "git+https://github.com/mxinden/rust-yamux?branch=wasm-instant#3e8d47fbfb09fa636993325c4f633da5b00494a7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad1d0148b89300047e72994bee99ecdabd15a9166a7b70c8b8c37c314dcc9002" dependencies = [ "futures", "instant", diff --git a/Cargo.toml b/Cargo.toml index dc4992e1627..11ac47b0be4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,6 @@ unsigned-varint = { version = "0.8.0" } # we import via `rust-multiaddr`. # This is expected to stay here until we move `libp2p-identity` to a separate repository which makes the dependency relationship more obvious. libp2p-identity = { path = "identity" } -yamux = { git = "https://github.com/mxinden/rust-yamux", branch = "wasm-instant" } [workspace.lints] rust.unreachable_pub = "warn" diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 9495da9a131..36601ae56af 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -16,7 +16,7 @@ futures = "0.3.29" libp2p-core = { workspace = true } thiserror = "1.0" yamux012 = { version = "0.12.1", package = "yamux" } -yamux013 = { version = "0.13", package = "yamux" } +yamux013 = { version = "0.13.1", package = "yamux" } tracing = "0.1.37" [dev-dependencies]