From 62cf6119be59e169d72f022c5248203d7a79a2f6 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 18:51:47 -0700 Subject: [PATCH 01/31] extracted from go-peerstream --- p2p/muxer/yamux/yamux.go | 94 +++++++++++++++++++++++++++++++++++ p2p/muxer/yamux/yamux_test.go | 11 ++++ 2 files changed, 105 insertions(+) create mode 100644 p2p/muxer/yamux/yamux.go create mode 100644 p2p/muxer/yamux/yamux_test.go diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go new file mode 100644 index 0000000000..c5861d1a28 --- /dev/null +++ b/p2p/muxer/yamux/yamux.go @@ -0,0 +1,94 @@ +package sm_yamux + +import ( + "io/ioutil" + "net" + "time" + + smux "github.com/jbenet/go-stream-mux" + yamux "github.com/jbenet/go-stream-mux/Godeps/_workspace/src/github.com/hashicorp/yamux" +) + +// stream implements smux.Stream using a ss.Stream +type stream yamux.Stream + +func (s *stream) yamuxStream() *yamux.Stream { + return (*yamux.Stream)(s) +} + +func (s *stream) Read(buf []byte) (int, error) { + return s.yamuxStream().Read(buf) +} + +func (s *stream) Write(buf []byte) (int, error) { + return s.yamuxStream().Write(buf) +} + +func (s *stream) Close() error { + return s.yamuxStream().Close() +} + +// Conn is a connection to a remote peer. +type conn yamux.Session + +func (c *conn) yamuxSession() *yamux.Session { + return (*yamux.Session)(c) +} + +func (c *conn) Close() error { + return c.yamuxSession().Close() +} + +func (c *conn) IsClosed() bool { + return c.yamuxSession().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (smux.Stream, error) { + s, err := c.yamuxSession().OpenStream() + if err != nil { + return nil, err + } + + return (*stream)(s), nil +} + +// Serve starts listening for incoming requests and handles them +// using given StreamHandler +func (c *conn) Serve(handler smux.StreamHandler) { + for { // accept loop + s, err := c.yamuxSession().AcceptStream() + if err != nil { + return // err always means closed. + } + go handler((*stream)(s)) + } +} + +// Transport is a go-peerstream transport that constructs +// yamux-backed connections. +type Transport yamux.Config + +// DefaultTransport has default settings for yamux +var DefaultTransport = (*Transport)(&yamux.Config{ + AcceptBacklog: 256, // from yamux.DefaultConfig + EnableKeepAlive: true, // from yamux.DefaultConfig + KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig + MaxStreamWindowSize: uint32(256 * 1024), // from yamux.DefaultConfig + LogOutput: ioutil.Discard, +}) + +func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { + var s *yamux.Session + var err error + if isServer { + s, err = yamux.Server(nc, t.Config()) + } else { + s, err = yamux.Client(nc, t.Config()) + } + return (*conn)(s), err +} + +func (t *Transport) Config() *yamux.Config { + return (*yamux.Config)(t) +} diff --git a/p2p/muxer/yamux/yamux_test.go b/p2p/muxer/yamux/yamux_test.go new file mode 100644 index 0000000000..2a319570c2 --- /dev/null +++ b/p2p/muxer/yamux/yamux_test.go @@ -0,0 +1,11 @@ +package sm_yamux + +import ( + "testing" + + test "github.com/jbenet/go-stream-mux/test" +) + +func TestYamuxTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} From de05b59cdf2cec0cc706775bd3fbf9558c6882b8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 19:17:39 -0700 Subject: [PATCH 02/31] fixed urls --- p2p/muxer/yamux/yamux.go | 2 +- p2p/muxer/yamux/yamux_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index c5861d1a28..4e6794fc56 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -6,7 +6,7 @@ import ( "time" smux "github.com/jbenet/go-stream-mux" - yamux "github.com/jbenet/go-stream-mux/Godeps/_workspace/src/github.com/hashicorp/yamux" + yamux "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/hashicorp/yamux" ) // stream implements smux.Stream using a ss.Stream diff --git a/p2p/muxer/yamux/yamux_test.go b/p2p/muxer/yamux/yamux_test.go index 2a319570c2..f95409ec24 100644 --- a/p2p/muxer/yamux/yamux_test.go +++ b/p2p/muxer/yamux/yamux_test.go @@ -3,7 +3,7 @@ package sm_yamux import ( "testing" - test "github.com/jbenet/go-stream-mux/test" + test "github.com/jbenet/go-stream-muxer/test" ) func TestYamuxTransport(t *testing.T) { From 59832ab002103f730060d260d570fb9514e1eb4e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 19:39:04 -0700 Subject: [PATCH 03/31] links fix --- p2p/muxer/yamux/yamux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 4e6794fc56..0f3a76f0a8 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,7 +5,7 @@ import ( "net" "time" - smux "github.com/jbenet/go-stream-mux" + smux "github.com/jbenet/go-stream-muxer" yamux "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/hashicorp/yamux" ) From d42b66198c20332f6f4fc95fdd048f813c9e6962 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 11 Jul 2015 20:55:24 -0700 Subject: [PATCH 04/31] implement AcceptStream --- p2p/muxer/yamux/yamux.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 0f3a76f0a8..0e13843878 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -53,15 +53,21 @@ func (c *conn) OpenStream() (smux.Stream, error) { return (*stream)(s), nil } +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (smux.Stream, error) { + s, err := c.yamuxSession().AcceptStream() + return (*stream)(s), err +} + // Serve starts listening for incoming requests and handles them // using given StreamHandler func (c *conn) Serve(handler smux.StreamHandler) { for { // accept loop - s, err := c.yamuxSession().AcceptStream() + s, err := c.AcceptStream() if err != nil { return // err always means closed. } - go handler((*stream)(s)) + go handler(s) } } From 4332c8da95e04a75e5b6269d0dee02884c90a301 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 2 May 2016 15:24:02 -0700 Subject: [PATCH 05/31] gxify and update deps --- p2p/muxer/yamux/yamux.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 0e13843878..a6abb6f043 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,8 +5,8 @@ import ( "net" "time" + yamux "github.com/hashicorp/yamux" smux "github.com/jbenet/go-stream-muxer" - yamux "github.com/jbenet/go-stream-muxer/Godeps/_workspace/src/github.com/hashicorp/yamux" ) // stream implements smux.Stream using a ss.Stream @@ -77,11 +77,12 @@ type Transport yamux.Config // DefaultTransport has default settings for yamux var DefaultTransport = (*Transport)(&yamux.Config{ - AcceptBacklog: 256, // from yamux.DefaultConfig - EnableKeepAlive: true, // from yamux.DefaultConfig - KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig - MaxStreamWindowSize: uint32(256 * 1024), // from yamux.DefaultConfig - LogOutput: ioutil.Discard, + AcceptBacklog: 256, // from yamux.DefaultConfig + EnableKeepAlive: true, // from yamux.DefaultConfig + KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig + ConnectionWriteTimeout: 10 * time.Second, // from yamux.DefaultConfig + MaxStreamWindowSize: uint32(256 * 1024), // from yamux.DefaultConfig + LogOutput: ioutil.Discard, }) func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { From 3a964c3cbfc261ed2e9af715384f8efe7edb29f4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 4 May 2016 15:51:32 -0700 Subject: [PATCH 06/31] rewrite paths to gx versions --- p2p/muxer/yamux/yamux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index a6abb6f043..86d722be8e 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,8 +5,8 @@ import ( "net" "time" - yamux "github.com/hashicorp/yamux" smux "github.com/jbenet/go-stream-muxer" + yamux "gx/ipfs/QmT8nkh6VVJ2fWgAshstDkDWssAY1EXBhoraqGDfGeVx9Q/yamux" ) // stream implements smux.Stream using a ss.Stream From c6d96a9aa0650b351f063836abb0b98b6d29e593 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 13 May 2016 09:15:25 -0700 Subject: [PATCH 07/31] gx publish version 1.0.0 --- p2p/muxer/yamux/yamux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 86d722be8e..a6abb6f043 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,8 +5,8 @@ import ( "net" "time" + yamux "github.com/hashicorp/yamux" smux "github.com/jbenet/go-stream-muxer" - yamux "gx/ipfs/QmT8nkh6VVJ2fWgAshstDkDWssAY1EXBhoraqGDfGeVx9Q/yamux" ) // stream implements smux.Stream using a ss.Stream From 2fc370b3884ac0ac5a198b4c9e94966719eb0726 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 6 Nov 2016 21:39:54 -0800 Subject: [PATCH 08/31] add deadline methods to stream --- p2p/muxer/yamux/yamux.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index a6abb6f043..6f62aa8ac2 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -10,6 +10,7 @@ import ( ) // stream implements smux.Stream using a ss.Stream +// TODO: do we actually need a wrapper here? type stream yamux.Stream func (s *stream) yamuxStream() *yamux.Stream { @@ -28,6 +29,18 @@ func (s *stream) Close() error { return s.yamuxStream().Close() } +func (s *stream) SetDeadline(t time.Time) error { + return s.yamuxStream().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.yamuxStream().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.yamuxStream().SetWriteDeadline(t) +} + // Conn is a connection to a remote peer. type conn yamux.Session From bae0cbcb2f98c888beb549b9d6b3198782ef7aed Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 16 Jul 2017 13:42:31 -0700 Subject: [PATCH 09/31] cleanup and change import --- p2p/muxer/yamux/yamux.go | 38 +++----------------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 6f62aa8ac2..a8877c213b 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,42 +5,10 @@ import ( "net" "time" - yamux "github.com/hashicorp/yamux" smux "github.com/jbenet/go-stream-muxer" + yamux "github.com/whyrusleeping/yamux" ) -// stream implements smux.Stream using a ss.Stream -// TODO: do we actually need a wrapper here? -type stream yamux.Stream - -func (s *stream) yamuxStream() *yamux.Stream { - return (*yamux.Stream)(s) -} - -func (s *stream) Read(buf []byte) (int, error) { - return s.yamuxStream().Read(buf) -} - -func (s *stream) Write(buf []byte) (int, error) { - return s.yamuxStream().Write(buf) -} - -func (s *stream) Close() error { - return s.yamuxStream().Close() -} - -func (s *stream) SetDeadline(t time.Time) error { - return s.yamuxStream().SetDeadline(t) -} - -func (s *stream) SetReadDeadline(t time.Time) error { - return s.yamuxStream().SetReadDeadline(t) -} - -func (s *stream) SetWriteDeadline(t time.Time) error { - return s.yamuxStream().SetWriteDeadline(t) -} - // Conn is a connection to a remote peer. type conn yamux.Session @@ -63,13 +31,13 @@ func (c *conn) OpenStream() (smux.Stream, error) { return nil, err } - return (*stream)(s), nil + return s, nil } // AcceptStream accepts a stream opened by the other side. func (c *conn) AcceptStream() (smux.Stream, error) { s, err := c.yamuxSession().AcceptStream() - return (*stream)(s), err + return s, err } // Serve starts listening for incoming requests and handles them From 3576d5ec8ef85d0e04ea78078ac411b7a4013727 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 18 Aug 2017 14:04:32 -0700 Subject: [PATCH 10/31] go-stream-muxer has moved to libp2p --- p2p/muxer/yamux/yamux.go | 2 +- p2p/muxer/yamux/yamux_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index a8877c213b..6fbef11bbd 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,7 +5,7 @@ import ( "net" "time" - smux "github.com/jbenet/go-stream-muxer" + smux "github.com/libp2p/go-stream-muxer" yamux "github.com/whyrusleeping/yamux" ) diff --git a/p2p/muxer/yamux/yamux_test.go b/p2p/muxer/yamux/yamux_test.go index f95409ec24..fbd0188b8d 100644 --- a/p2p/muxer/yamux/yamux_test.go +++ b/p2p/muxer/yamux/yamux_test.go @@ -3,7 +3,7 @@ package sm_yamux import ( "testing" - test "github.com/jbenet/go-stream-muxer/test" + test "github.com/libp2p/go-stream-muxer/test" ) func TestYamuxTransport(t *testing.T) { From a258c5039fee4159afe9b657decde3bd49af822c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 19 Jul 2017 11:53:57 +0700 Subject: [PATCH 11/31] remove conn.Serve This method was recently removed from the interface. --- p2p/muxer/yamux/yamux.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 6fbef11bbd..7a602d0ba6 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -40,18 +40,6 @@ func (c *conn) AcceptStream() (smux.Stream, error) { return s, err } -// Serve starts listening for incoming requests and handles them -// using given StreamHandler -func (c *conn) Serve(handler smux.StreamHandler) { - for { // accept loop - s, err := c.AcceptStream() - if err != nil { - return // err always means closed. - } - go handler(s) - } -} - // Transport is a go-peerstream transport that constructs // yamux-backed connections. type Transport yamux.Config From c31fdfddc79a5e7b79c55235f9470a442987941d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 25 Sep 2018 15:18:08 -0700 Subject: [PATCH 12/31] bump the stream window size to 1MiB work towards https://github.com/libp2p/go-libp2p/issues/435 --- p2p/muxer/yamux/yamux.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 7a602d0ba6..12212eeb8f 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -46,12 +46,17 @@ type Transport yamux.Config // DefaultTransport has default settings for yamux var DefaultTransport = (*Transport)(&yamux.Config{ - AcceptBacklog: 256, // from yamux.DefaultConfig - EnableKeepAlive: true, // from yamux.DefaultConfig - KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig - ConnectionWriteTimeout: 10 * time.Second, // from yamux.DefaultConfig - MaxStreamWindowSize: uint32(256 * 1024), // from yamux.DefaultConfig - LogOutput: ioutil.Discard, + AcceptBacklog: 256, // from yamux.DefaultConfig + EnableKeepAlive: true, // from yamux.DefaultConfig + KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig + ConnectionWriteTimeout: 10 * time.Second, // from yamux.DefaultConfig + // We've bumped this to 1MiB as this critically limits throughput. + // + // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with + // 100ms latency. The default gave us 2.4MiB *best case* which was + // totally unacceptable. + MaxStreamWindowSize: uint32(1024 * 1024), + LogOutput: ioutil.Discard, }) func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { From 444e14b8788b4b40412f72fe71774bc199347b73 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 5 Mar 2019 12:11:48 +0200 Subject: [PATCH 13/31] bump window size to 16MiB --- p2p/muxer/yamux/yamux.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 12212eeb8f..05940af6d8 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -50,12 +50,12 @@ var DefaultTransport = (*Transport)(&yamux.Config{ EnableKeepAlive: true, // from yamux.DefaultConfig KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig ConnectionWriteTimeout: 10 * time.Second, // from yamux.DefaultConfig - // We've bumped this to 1MiB as this critically limits throughput. + // We've bumped this to 16MiB as this critically limits throughput. // // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with // 100ms latency. The default gave us 2.4MiB *best case* which was // totally unacceptable. - MaxStreamWindowSize: uint32(1024 * 1024), + MaxStreamWindowSize: uint32(16 * 1024 * 1024), LogOutput: ioutil.Discard, }) From 7a70e760a154bb03ec884472df05ac7e3c4182e5 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 21 May 2019 23:05:12 +0300 Subject: [PATCH 14/31] copy default config, initialize coalesce delay --- p2p/muxer/yamux/yamux.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 05940af6d8..edc95cf87f 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -44,20 +44,20 @@ func (c *conn) AcceptStream() (smux.Stream, error) { // yamux-backed connections. type Transport yamux.Config -// DefaultTransport has default settings for yamux -var DefaultTransport = (*Transport)(&yamux.Config{ - AcceptBacklog: 256, // from yamux.DefaultConfig - EnableKeepAlive: true, // from yamux.DefaultConfig - KeepAliveInterval: 30 * time.Second, // from yamux.DefaultConfig - ConnectionWriteTimeout: 10 * time.Second, // from yamux.DefaultConfig +var DefaultTransport *Transport + +func init() { + config := yamux.DefaultConfig() // We've bumped this to 16MiB as this critically limits throughput. // // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with // 100ms latency. The default gave us 2.4MiB *best case* which was // totally unacceptable. - MaxStreamWindowSize: uint32(16 * 1024 * 1024), - LogOutput: ioutil.Discard, -}) + config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) + // don't spam + config.LogOutput = ioutil.Discard + DefaultTransport = (*Transport)(config) +} func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { var s *yamux.Session From 88c1e0ecf78a5788a9b7effbb5733d0326faac9d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 21 May 2019 14:46:20 -0700 Subject: [PATCH 15/31] fix: unused import --- p2p/muxer/yamux/yamux.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index edc95cf87f..03a3230814 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -3,7 +3,6 @@ package sm_yamux import ( "io/ioutil" "net" - "time" smux "github.com/libp2p/go-stream-muxer" yamux "github.com/whyrusleeping/yamux" From c35c511ccc5e410632a421f15ecfbaa9b2924973 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 21 May 2019 18:41:01 -0700 Subject: [PATCH 16/31] migrate to libp2p org --- p2p/muxer/yamux/yamux.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 03a3230814..965c8fb47c 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -5,7 +5,7 @@ import ( "net" smux "github.com/libp2p/go-stream-muxer" - yamux "github.com/whyrusleeping/yamux" + yamux "github.com/libp2p/go-yamux" ) // Conn is a connection to a remote peer. From a59b684e6d404d17529475c28efbc75ff8698a28 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 22 May 2019 13:19:06 -0700 Subject: [PATCH 17/31] fix: don't buffer reads This doesn't help when running over a security transport and allocates a bunch of memory. --- p2p/muxer/yamux/yamux.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 965c8fb47c..55e45bdde3 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -55,6 +55,9 @@ func init() { config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) // don't spam config.LogOutput = ioutil.Discard + // We always run over a security transport that buffers internally + // (i.e., uses a block cipher). + config.ReadBufSize = 0 DefaultTransport = (*Transport)(config) } From cb81de4268bcfea5c2cd63b3baa6144b51478c5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Sun, 26 May 2019 15:44:42 +0100 Subject: [PATCH 18/31] migrate to consolidated types; add travis config. (#2) --- p2p/muxer/yamux/yamux.go | 8 ++++---- p2p/muxer/yamux/yamux_test.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go index 55e45bdde3..4875761cf4 100644 --- a/p2p/muxer/yamux/yamux.go +++ b/p2p/muxer/yamux/yamux.go @@ -4,7 +4,7 @@ import ( "io/ioutil" "net" - smux "github.com/libp2p/go-stream-muxer" + mux "github.com/libp2p/go-libp2p-core/mux" yamux "github.com/libp2p/go-yamux" ) @@ -24,7 +24,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (smux.Stream, error) { +func (c *conn) OpenStream() (mux.MuxedStream, error) { s, err := c.yamuxSession().OpenStream() if err != nil { return nil, err @@ -34,7 +34,7 @@ func (c *conn) OpenStream() (smux.Stream, error) { } // AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (smux.Stream, error) { +func (c *conn) AcceptStream() (mux.MuxedStream, error) { s, err := c.yamuxSession().AcceptStream() return s, err } @@ -61,7 +61,7 @@ func init() { DefaultTransport = (*Transport)(config) } -func (t *Transport) NewConn(nc net.Conn, isServer bool) (smux.Conn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { var s *yamux.Session var err error if isServer { diff --git a/p2p/muxer/yamux/yamux_test.go b/p2p/muxer/yamux/yamux_test.go index fbd0188b8d..409b0967c8 100644 --- a/p2p/muxer/yamux/yamux_test.go +++ b/p2p/muxer/yamux/yamux_test.go @@ -3,9 +3,9 @@ package sm_yamux import ( "testing" - test "github.com/libp2p/go-stream-muxer/test" + tmux "github.com/libp2p/go-libp2p-testing/suites/mux" ) func TestYamuxTransport(t *testing.T) { - test.SubtestAll(t, DefaultTransport) + tmux.SubtestAll(t, DefaultTransport) } From 2a0c987b076b9728765161b9acccbdcbc570b4cc Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 17:55:09 +0200 Subject: [PATCH 19/31] restructure files layout --- p2p/muxer/yamux/conn.go | 41 ++++++++++ p2p/muxer/yamux/mux.go | 48 ++++++++++++ .../yamux/{yamux_test.go => mux_test.go} | 2 +- p2p/muxer/yamux/yamux.go | 77 ------------------- 4 files changed, 90 insertions(+), 78 deletions(-) create mode 100644 p2p/muxer/yamux/conn.go create mode 100644 p2p/muxer/yamux/mux.go rename p2p/muxer/yamux/{yamux_test.go => mux_test.go} (75%) delete mode 100644 p2p/muxer/yamux/yamux.go diff --git a/p2p/muxer/yamux/conn.go b/p2p/muxer/yamux/conn.go new file mode 100644 index 0000000000..6c39cc2035 --- /dev/null +++ b/p2p/muxer/yamux/conn.go @@ -0,0 +1,41 @@ +package sm_yamux + +import ( + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-yamux" +) + +// conn implements mux.MuxedConn over yamux.Session. +type conn yamux.Session + +// Close closes underlying yamux +func (c *conn) Close() error { + return c.yamux().Close() +} + +// IsClosed checks if yamux.Session is in closed state. +func (c *conn) IsClosed() bool { + return c.yamux().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (mux.MuxedStream, error) { + s, err := c.yamux().OpenStream() + if err != nil { + return nil, err + } + + return (*stream)(s), nil +} + +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (mux.MuxedStream, error) { + s, err := c.yamux().AcceptStream() + return (*stream)(s), err +} + +func (c *conn) yamux() *yamux.Session { + return (*yamux.Session)(c) +} + +var _ mux.MuxedConn = &conn{} diff --git a/p2p/muxer/yamux/mux.go b/p2p/muxer/yamux/mux.go new file mode 100644 index 0000000000..f0c1af2995 --- /dev/null +++ b/p2p/muxer/yamux/mux.go @@ -0,0 +1,48 @@ +package sm_yamux + +import ( + "io/ioutil" + "net" + + mux "github.com/libp2p/go-libp2p-core/mux" + yamux "github.com/libp2p/go-yamux" +) + +var DefaultTransport *Multiplexer + +func init() { + config := yamux.DefaultConfig() + // We've bumped this to 16MiB as this critically limits throughput. + // + // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with + // 100ms latency. The default gave us 2.4MiB *best case* which was + // totally unacceptable. + config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) + // don't spam + config.LogOutput = ioutil.Discard + // We always run over a security transport that buffers internally + // (i.e., uses a block cipher). + config.ReadBufSize = 0 + DefaultTransport = (*Multiplexer)(config) +} + +// Multiplexer implements mux.Multiplexer that constructs +// yamux-backed muxed connections. +type Multiplexer yamux.Config + +func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { + var s *yamux.Session + var err error + if isServer { + s, err = yamux.Server(nc, t.Config()) + } else { + s, err = yamux.Client(nc, t.Config()) + } + return (*conn)(s), err +} + +func (t *Multiplexer) Config() *yamux.Config { + return (*yamux.Config)(t) +} + +var _ mux.Multiplexer = &Multiplexer{} diff --git a/p2p/muxer/yamux/yamux_test.go b/p2p/muxer/yamux/mux_test.go similarity index 75% rename from p2p/muxer/yamux/yamux_test.go rename to p2p/muxer/yamux/mux_test.go index 409b0967c8..d3a9b4ff79 100644 --- a/p2p/muxer/yamux/yamux_test.go +++ b/p2p/muxer/yamux/mux_test.go @@ -6,6 +6,6 @@ import ( tmux "github.com/libp2p/go-libp2p-testing/suites/mux" ) -func TestYamuxTransport(t *testing.T) { +func TestDefaultMultiplexer(t *testing.T) { tmux.SubtestAll(t, DefaultTransport) } diff --git a/p2p/muxer/yamux/yamux.go b/p2p/muxer/yamux/yamux.go deleted file mode 100644 index 4875761cf4..0000000000 --- a/p2p/muxer/yamux/yamux.go +++ /dev/null @@ -1,77 +0,0 @@ -package sm_yamux - -import ( - "io/ioutil" - "net" - - mux "github.com/libp2p/go-libp2p-core/mux" - yamux "github.com/libp2p/go-yamux" -) - -// Conn is a connection to a remote peer. -type conn yamux.Session - -func (c *conn) yamuxSession() *yamux.Session { - return (*yamux.Session)(c) -} - -func (c *conn) Close() error { - return c.yamuxSession().Close() -} - -func (c *conn) IsClosed() bool { - return c.yamuxSession().IsClosed() -} - -// OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { - s, err := c.yamuxSession().OpenStream() - if err != nil { - return nil, err - } - - return s, nil -} - -// AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (mux.MuxedStream, error) { - s, err := c.yamuxSession().AcceptStream() - return s, err -} - -// Transport is a go-peerstream transport that constructs -// yamux-backed connections. -type Transport yamux.Config - -var DefaultTransport *Transport - -func init() { - config := yamux.DefaultConfig() - // We've bumped this to 16MiB as this critically limits throughput. - // - // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with - // 100ms latency. The default gave us 2.4MiB *best case* which was - // totally unacceptable. - config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) - // don't spam - config.LogOutput = ioutil.Discard - // We always run over a security transport that buffers internally - // (i.e., uses a block cipher). - config.ReadBufSize = 0 - DefaultTransport = (*Transport)(config) -} - -func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { - var s *yamux.Session - var err error - if isServer { - s, err = yamux.Server(nc, t.Config()) - } else { - s, err = yamux.Client(nc, t.Config()) - } - return (*conn)(s), err -} - -func (t *Transport) Config() *yamux.Config { - return (*yamux.Config)(t) -} From df4c879cb3372f80bae83ab0feed371f62aa4007 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 17:56:44 +0200 Subject: [PATCH 20/31] redefine yamux stream to implement mux.MuxedStream and to respect mux.ErrReset --- p2p/muxer/yamux/stream.go | 55 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 p2p/muxer/yamux/stream.go diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go new file mode 100644 index 0000000000..866b781fbe --- /dev/null +++ b/p2p/muxer/yamux/stream.go @@ -0,0 +1,55 @@ +package sm_yamux + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-yamux" +) + +// stream implements mux.MuxedStream over yamux.Stream. +type stream yamux.Stream + +func (s *stream) Read(b []byte) (n int, err error) { + n, err = s.yamux().Read(b) + if err == yamux.ErrConnectionReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Write(b []byte) (n int, err error) { + n, err = s.yamux().Write(b) + if err == yamux.ErrConnectionReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Close() error { + return s.yamux().Close() +} + +func (s *stream) Reset() error { + return s.yamux().Reset() +} + +func (s *stream) SetDeadline(t time.Time) error { + return s.yamux().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.yamux().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.yamux().SetWriteDeadline(t) +} + +func (s *stream) yamux() *yamux.Stream { + return (*yamux.Stream)(s) +} + +var _ mux.MuxedStream = &stream{} From f5d58efcaee5afd8f5baceb37af9b563d932e2c4 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 20:12:16 +0200 Subject: [PATCH 21/31] revert Multiplexer name back to Transport --- p2p/muxer/yamux/{mux.go => transport.go} | 14 +++++++------- p2p/muxer/yamux/{mux_test.go => transport_test.go} | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) rename p2p/muxer/yamux/{mux.go => transport.go} (72%) rename p2p/muxer/yamux/{mux_test.go => transport_test.go} (75%) diff --git a/p2p/muxer/yamux/mux.go b/p2p/muxer/yamux/transport.go similarity index 72% rename from p2p/muxer/yamux/mux.go rename to p2p/muxer/yamux/transport.go index f0c1af2995..9b33f81440 100644 --- a/p2p/muxer/yamux/mux.go +++ b/p2p/muxer/yamux/transport.go @@ -8,7 +8,7 @@ import ( yamux "github.com/libp2p/go-yamux" ) -var DefaultTransport *Multiplexer +var DefaultTransport *Transport func init() { config := yamux.DefaultConfig() @@ -23,14 +23,14 @@ func init() { // We always run over a security transport that buffers internally // (i.e., uses a block cipher). config.ReadBufSize = 0 - DefaultTransport = (*Multiplexer)(config) + DefaultTransport = (*Transport)(config) } -// Multiplexer implements mux.Multiplexer that constructs +// Transport implements mux.Multiplexer that constructs // yamux-backed muxed connections. -type Multiplexer yamux.Config +type Transport yamux.Config -func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { var s *yamux.Session var err error if isServer { @@ -41,8 +41,8 @@ func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) return (*conn)(s), err } -func (t *Multiplexer) Config() *yamux.Config { +func (t *Transport) Config() *yamux.Config { return (*yamux.Config)(t) } -var _ mux.Multiplexer = &Multiplexer{} +var _ mux.Multiplexer = &Transport{} diff --git a/p2p/muxer/yamux/mux_test.go b/p2p/muxer/yamux/transport_test.go similarity index 75% rename from p2p/muxer/yamux/mux_test.go rename to p2p/muxer/yamux/transport_test.go index d3a9b4ff79..0499971a64 100644 --- a/p2p/muxer/yamux/mux_test.go +++ b/p2p/muxer/yamux/transport_test.go @@ -6,6 +6,6 @@ import ( tmux "github.com/libp2p/go-libp2p-testing/suites/mux" ) -func TestDefaultMultiplexer(t *testing.T) { +func TestDefaultTransport(t *testing.T) { tmux.SubtestAll(t, DefaultTransport) } From 3416b29bcd544443127861ab411645f06ca446c8 Mon Sep 17 00:00:00 2001 From: Hlib Date: Mon, 2 Mar 2020 12:32:40 +0200 Subject: [PATCH 22/31] use explicit return values --- p2p/muxer/yamux/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go index 866b781fbe..9f6e4f9a7a 100644 --- a/p2p/muxer/yamux/stream.go +++ b/p2p/muxer/yamux/stream.go @@ -16,7 +16,7 @@ func (s *stream) Read(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Write(b []byte) (n int, err error) { @@ -25,7 +25,7 @@ func (s *stream) Write(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Close() error { From 846e4efb4bbbd6774238b0825f7e954a665599eb Mon Sep 17 00:00:00 2001 From: Hlib Date: Tue, 3 Mar 2020 12:19:56 +0200 Subject: [PATCH 23/31] update go-yamux --- p2p/muxer/yamux/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go index 9f6e4f9a7a..d5b865b777 100644 --- a/p2p/muxer/yamux/stream.go +++ b/p2p/muxer/yamux/stream.go @@ -12,7 +12,7 @@ type stream yamux.Stream func (s *stream) Read(b []byte) (n int, err error) { n, err = s.yamux().Read(b) - if err == yamux.ErrConnectionReset { + if err == yamux.ErrStreamReset { err = mux.ErrReset } @@ -21,7 +21,7 @@ func (s *stream) Read(b []byte) (n int, err error) { func (s *stream) Write(b []byte) (n int, err error) { n, err = s.yamux().Write(b) - if err == yamux.ErrConnectionReset { + if err == yamux.ErrStreamReset { err = mux.ErrReset } From aa1de9acde321f692550cf81624b8c8372dcb950 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 1 Sep 2020 18:23:14 -0700 Subject: [PATCH 24/31] feat: update to new stream interfaces This change updates to the latest stream interfaces. --- p2p/muxer/yamux/stream.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go index d5b865b777..f51b721b3b 100644 --- a/p2p/muxer/yamux/stream.go +++ b/p2p/muxer/yamux/stream.go @@ -36,6 +36,14 @@ func (s *stream) Reset() error { return s.yamux().Reset() } +func (s *stream) CloseRead() error { + return s.yamux().CloseRead() +} + +func (s *stream) CloseWrite() error { + return s.yamux().CloseWrite() +} + func (s *stream) SetDeadline(t time.Time) error { return s.yamux().SetDeadline(t) } From d916bc65468e4bbc2370086d861f9a49a044be07 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 17 Dec 2020 16:12:01 +0700 Subject: [PATCH 25/31] change OpenStream to accept a context --- p2p/muxer/yamux/conn.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/conn.go b/p2p/muxer/yamux/conn.go index 6c39cc2035..4301d46633 100644 --- a/p2p/muxer/yamux/conn.go +++ b/p2p/muxer/yamux/conn.go @@ -1,6 +1,8 @@ package sm_yamux import ( + "context" + "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-yamux" ) @@ -19,7 +21,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { +func (c *conn) OpenStream(context.Context) (mux.MuxedStream, error) { s, err := c.yamux().OpenStream() if err != nil { return nil, err From 07602f6890f0b4e48f12f31487dd28df7e9f3c67 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sat, 19 Dec 2020 15:45:52 +0700 Subject: [PATCH 26/31] update go-yamux to v2.0.0, use context passed to OpenStream --- p2p/muxer/yamux/conn.go | 6 +++--- p2p/muxer/yamux/stream.go | 2 +- p2p/muxer/yamux/transport.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/p2p/muxer/yamux/conn.go b/p2p/muxer/yamux/conn.go index 4301d46633..78ba09c7ba 100644 --- a/p2p/muxer/yamux/conn.go +++ b/p2p/muxer/yamux/conn.go @@ -4,7 +4,7 @@ import ( "context" "github.com/libp2p/go-libp2p-core/mux" - "github.com/libp2p/go-yamux" + "github.com/libp2p/go-yamux/v2" ) // conn implements mux.MuxedConn over yamux.Session. @@ -21,8 +21,8 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream(context.Context) (mux.MuxedStream, error) { - s, err := c.yamux().OpenStream() +func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { + s, err := c.yamux().OpenStream(ctx) if err != nil { return nil, err } diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go index f51b721b3b..6d34184e0f 100644 --- a/p2p/muxer/yamux/stream.go +++ b/p2p/muxer/yamux/stream.go @@ -4,7 +4,7 @@ import ( "time" "github.com/libp2p/go-libp2p-core/mux" - "github.com/libp2p/go-yamux" + "github.com/libp2p/go-yamux/v2" ) // stream implements mux.MuxedStream over yamux.Stream. diff --git a/p2p/muxer/yamux/transport.go b/p2p/muxer/yamux/transport.go index 9b33f81440..7c2c01100c 100644 --- a/p2p/muxer/yamux/transport.go +++ b/p2p/muxer/yamux/transport.go @@ -5,7 +5,7 @@ import ( "net" mux "github.com/libp2p/go-libp2p-core/mux" - yamux "github.com/libp2p/go-yamux" + "github.com/libp2p/go-yamux/v2" ) var DefaultTransport *Transport From 6f1591092563ad0c6a0527499caa585130232fc9 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 10 Dec 2021 13:51:15 +0400 Subject: [PATCH 27/31] reduce the number of max incoming stream to 256 go-yamux sets a default of 1000. --- p2p/muxer/yamux/transport.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/transport.go b/p2p/muxer/yamux/transport.go index 7c2c01100c..71315ec894 100644 --- a/p2p/muxer/yamux/transport.go +++ b/p2p/muxer/yamux/transport.go @@ -4,7 +4,7 @@ import ( "io/ioutil" "net" - mux "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-libp2p-core/mux" "github.com/libp2p/go-yamux/v2" ) @@ -23,6 +23,7 @@ func init() { // We always run over a security transport that buffers internally // (i.e., uses a block cipher). config.ReadBufSize = 0 + config.MaxIncomingStreams = 256 DefaultTransport = (*Transport)(config) } From 090ba44e543f286f5b373f3cace50802f1b79a2b Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 10 Dec 2021 18:43:37 +0400 Subject: [PATCH 28/31] disable the 1000 streams test --- p2p/muxer/yamux/transport_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/p2p/muxer/yamux/transport_test.go b/p2p/muxer/yamux/transport_test.go index 0499971a64..48136c48d2 100644 --- a/p2p/muxer/yamux/transport_test.go +++ b/p2p/muxer/yamux/transport_test.go @@ -7,5 +7,9 @@ import ( ) func TestDefaultTransport(t *testing.T) { + // Yamux doesn't have any backpressure when it comes to opening streams. + // If the peer opens too many streams, those are just reset. + delete(tmux.Subtests, "github.com/libp2p/go-libp2p-testing/suites/mux.SubtestStress1Conn1000Stream10Msg") + tmux.SubtestAll(t, DefaultTransport) } From ba15ca1e6d34a36b694136888afad06fdb802e83 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Mon, 17 Jan 2022 03:34:24 -0800 Subject: [PATCH 29/31] pass the PeerScope to yamux (satifiying its MemoryManger interface) (#46) --- p2p/muxer/yamux/conn.go | 13 +++++++------ p2p/muxer/yamux/stream.go | 13 +++++++------ p2p/muxer/yamux/transport.go | 15 ++++++++------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/p2p/muxer/yamux/conn.go b/p2p/muxer/yamux/conn.go index 78ba09c7ba..5be7ed362d 100644 --- a/p2p/muxer/yamux/conn.go +++ b/p2p/muxer/yamux/conn.go @@ -3,13 +3,16 @@ package sm_yamux import ( "context" - "github.com/libp2p/go-libp2p-core/mux" - "github.com/libp2p/go-yamux/v2" + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-yamux/v3" ) // conn implements mux.MuxedConn over yamux.Session. type conn yamux.Session +var _ network.MuxedConn = &conn{} + // Close closes underlying yamux func (c *conn) Close() error { return c.yamux().Close() @@ -21,7 +24,7 @@ func (c *conn) IsClosed() bool { } // OpenStream creates a new stream. -func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { +func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { s, err := c.yamux().OpenStream(ctx) if err != nil { return nil, err @@ -31,7 +34,7 @@ func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) { } // AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (mux.MuxedStream, error) { +func (c *conn) AcceptStream() (network.MuxedStream, error) { s, err := c.yamux().AcceptStream() return (*stream)(s), err } @@ -39,5 +42,3 @@ func (c *conn) AcceptStream() (mux.MuxedStream, error) { func (c *conn) yamux() *yamux.Session { return (*yamux.Session)(c) } - -var _ mux.MuxedConn = &conn{} diff --git a/p2p/muxer/yamux/stream.go b/p2p/muxer/yamux/stream.go index 6d34184e0f..1a93916a8c 100644 --- a/p2p/muxer/yamux/stream.go +++ b/p2p/muxer/yamux/stream.go @@ -3,17 +3,20 @@ package sm_yamux import ( "time" - "github.com/libp2p/go-libp2p-core/mux" - "github.com/libp2p/go-yamux/v2" + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-yamux/v3" ) // stream implements mux.MuxedStream over yamux.Stream. type stream yamux.Stream +var _ network.MuxedStream = &stream{} + func (s *stream) Read(b []byte) (n int, err error) { n, err = s.yamux().Read(b) if err == yamux.ErrStreamReset { - err = mux.ErrReset + err = network.ErrReset } return n, err @@ -22,7 +25,7 @@ func (s *stream) Read(b []byte) (n int, err error) { func (s *stream) Write(b []byte) (n int, err error) { n, err = s.yamux().Write(b) if err == yamux.ErrStreamReset { - err = mux.ErrReset + err = network.ErrReset } return n, err @@ -59,5 +62,3 @@ func (s *stream) SetWriteDeadline(t time.Time) error { func (s *stream) yamux() *yamux.Stream { return (*yamux.Stream)(s) } - -var _ mux.MuxedStream = &stream{} diff --git a/p2p/muxer/yamux/transport.go b/p2p/muxer/yamux/transport.go index 71315ec894..a7a1fec1b6 100644 --- a/p2p/muxer/yamux/transport.go +++ b/p2p/muxer/yamux/transport.go @@ -4,8 +4,9 @@ import ( "io/ioutil" "net" - "github.com/libp2p/go-libp2p-core/mux" - "github.com/libp2p/go-yamux/v2" + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-yamux/v3" ) var DefaultTransport *Transport @@ -31,13 +32,15 @@ func init() { // yamux-backed muxed connections. type Transport yamux.Config -func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { +var _ network.Multiplexer = &Transport{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { var s *yamux.Session var err error if isServer { - s, err = yamux.Server(nc, t.Config()) + s, err = yamux.Server(nc, t.Config(), scope) } else { - s, err = yamux.Client(nc, t.Config()) + s, err = yamux.Client(nc, t.Config(), scope) } return (*conn)(s), err } @@ -45,5 +48,3 @@ func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { func (t *Transport) Config() *yamux.Config { return (*yamux.Config)(t) } - -var _ mux.Multiplexer = &Transport{} From af478a90b0bce8d777c1e15735f8760a29f96ca2 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Tue, 8 Feb 2022 13:43:41 +0530 Subject: [PATCH 30/31] disable the incoming streams limit (#49) This is now handled by the resource manager. --- p2p/muxer/yamux/transport.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/p2p/muxer/yamux/transport.go b/p2p/muxer/yamux/transport.go index a7a1fec1b6..ea2cd0e562 100644 --- a/p2p/muxer/yamux/transport.go +++ b/p2p/muxer/yamux/transport.go @@ -2,6 +2,7 @@ package sm_yamux import ( "io/ioutil" + "math" "net" "github.com/libp2p/go-libp2p-core/network" @@ -24,7 +25,9 @@ func init() { // We always run over a security transport that buffers internally // (i.e., uses a block cipher). config.ReadBufSize = 0 - config.MaxIncomingStreams = 256 + // Effectively disable the incoming streams limit. + // This is now dynamically limited by the resource manager. + config.MaxIncomingStreams = math.MaxUint32 DefaultTransport = (*Transport)(config) } From f3bdb4b53c62b77ce7e90ee2ae7a5f73a900cf3c Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 22 Apr 2022 18:07:10 +0100 Subject: [PATCH 31/31] switch from github.com/libp2p/go-libp2p-yamux to p2p/muxer/yamux --- config/muxer_test.go | 3 +-- defaults.go | 2 +- go.mod | 4 ++-- p2p/net/swarm/dial_worker_test.go | 2 +- p2p/net/swarm/testing/testing.go | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/config/muxer_test.go b/config/muxer_test.go index 76aada1061..96de6e1f11 100644 --- a/config/muxer_test.go +++ b/config/muxer_test.go @@ -4,13 +4,12 @@ import ( "testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" + yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - - yamux "github.com/libp2p/go-libp2p-yamux" ) func TestMuxerSimple(t *testing.T) { diff --git a/defaults.go b/defaults.go index 4c881114aa..72ee19cb51 100644 --- a/defaults.go +++ b/defaults.go @@ -5,6 +5,7 @@ package libp2p import ( "crypto/rand" + yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/net/connmgr" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -16,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p-peerstore/pstoremem" rcmgr "github.com/libp2p/go-libp2p-resource-manager" tls "github.com/libp2p/go-libp2p-tls" - yamux "github.com/libp2p/go-libp2p-yamux" "github.com/multiformats/go-multiaddr" ) diff --git a/go.mod b/go.mod index 13441b6d41..6a804cb9a2 100644 --- a/go.mod +++ b/go.mod @@ -28,12 +28,12 @@ require ( github.com/libp2p/go-libp2p-testing v0.9.2 github.com/libp2p/go-libp2p-tls v0.4.1 github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 - github.com/libp2p/go-libp2p-yamux v0.9.1 github.com/libp2p/go-msgio v0.2.0 github.com/libp2p/go-netroute v0.2.0 github.com/libp2p/go-reuseport v0.1.0 github.com/libp2p/go-reuseport-transport v0.1.0 github.com/libp2p/go-stream-muxer-multistream v0.4.0 + github.com/libp2p/go-yamux/v3 v3.1.1 github.com/libp2p/zeroconf/v2 v2.1.1 github.com/lucas-clemente/quic-go v0.27.0 github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd @@ -85,11 +85,11 @@ require ( github.com/libp2p/go-libp2p-pnet v0.2.0 // indirect github.com/libp2p/go-libp2p-quic-transport v0.17.0 // indirect github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect + github.com/libp2p/go-libp2p-yamux v0.9.1 // indirect github.com/libp2p/go-mplex v0.4.0 // indirect github.com/libp2p/go-nat v0.1.0 // indirect github.com/libp2p/go-openssl v0.0.7 // indirect github.com/libp2p/go-tcp-transport v0.5.1 // indirect - github.com/libp2p/go-yamux/v3 v3.1.1 // indirect github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index 2fec019a8e..b90092c540 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -19,7 +20,6 @@ import ( "github.com/libp2p/go-libp2p-peerstore/pstoremem" tnet "github.com/libp2p/go-libp2p-testing/net" tptu "github.com/libp2p/go-libp2p-transport-upgrader" - yamux "github.com/libp2p/go-libp2p-yamux" msmux "github.com/libp2p/go-stream-muxer-multistream" ma "github.com/multiformats/go-multiaddr" diff --git a/p2p/net/swarm/testing/testing.go b/p2p/net/swarm/testing/testing.go index c529e814b7..70359b0258 100644 --- a/p2p/net/swarm/testing/testing.go +++ b/p2p/net/swarm/testing/testing.go @@ -4,6 +4,7 @@ import ( "testing" "time" + yamux "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/net/swarm" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -22,7 +23,6 @@ import ( "github.com/libp2p/go-libp2p-peerstore/pstoremem" tnet "github.com/libp2p/go-libp2p-testing/net" tptu "github.com/libp2p/go-libp2p-transport-upgrader" - yamux "github.com/libp2p/go-libp2p-yamux" msmux "github.com/libp2p/go-stream-muxer-multistream" ma "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require"