diff --git a/p2p/test/transport/transport_test.go b/p2p/test/transport/transport_test.go index 22215d548d..7ef1d5b867 100644 --- a/p2p/test/transport/transport_test.go +++ b/p2p/test/transport/transport_test.go @@ -4,10 +4,14 @@ import ( "bytes" "context" "crypto/rand" + "errors" "fmt" "io" "net" + "runtime" + "strings" "sync" + "sync/atomic" "testing" "time" @@ -151,6 +155,8 @@ func TestPing(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(), @@ -178,20 +184,22 @@ func TestBigPing(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(), Addrs: h1.Addrs(), })) - h1.SetStreamHandler("/BIG-ping/1.0.0", func(s network.Stream) { + h1.SetStreamHandler("/big-ping", func(s network.Stream) { io.Copy(s, s) s.Close() }) errCh := make(chan error, 1) allocs := testing.AllocsPerRun(10, func() { - s, err := h2.NewStream(context.Background(), h1.ID(), "/BIG-ping/1.0.0") + s, err := h2.NewStream(context.Background(), h1.ID(), "/big-ping") require.NoError(t, err) defer s.Close() @@ -225,12 +233,87 @@ func TestBigPing(t *testing.T) { } } +// TestLotsOfDataManyStreams tests sending a lot of data on multiple streams. +func TestLotsOfDataManyStreams(t *testing.T) { + // Skip on windows because of https://github.com/libp2p/go-libp2p/issues/2341 + if runtime.GOOS == "windows" { + t.Skip("Skipping on windows because of https://github.com/libp2p/go-libp2p/issues/2341") + } + + // 64k buffer + const bufSize = 64 << 10 + sendBuf := [bufSize]byte{} + const totalStreams = 512 + const parallel = 8 + // Total sends are > 20MiB + require.Greater(t, len(sendBuf)*totalStreams, 20<<20) + t.Log("Total sends:", len(sendBuf)*totalStreams) + + // Fill with random bytes + _, err := rand.Read(sendBuf[:]) + require.NoError(t, err) + + for _, tc := range transportsToTest { + t.Run(tc.Name, func(t *testing.T) { + h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) + h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() + start := time.Now() + defer func() { + t.Log("Total time:", time.Since(start)) + }() + + require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ + ID: h1.ID(), + Addrs: h1.Addrs(), + })) + + h1.SetStreamHandler("/big-ping", func(s network.Stream) { + io.Copy(s, s) + s.Close() + }) + + sem := make(chan struct{}, parallel) + var wg sync.WaitGroup + for i := 0; i < totalStreams; i++ { + wg.Add(1) + sem <- struct{}{} + go func() { + defer wg.Done() + recvBuf := [bufSize]byte{} + defer func() { <-sem }() + + s, err := h2.NewStream(context.Background(), h1.ID(), "/big-ping") + require.NoError(t, err) + defer s.Close() + + _, err = s.Write(sendBuf[:]) + require.NoError(t, err) + s.CloseWrite() + + _, err = io.ReadFull(s, recvBuf[:]) + require.NoError(t, err) + require.Equal(t, sendBuf, recvBuf) + + _, err = s.Read([]byte{0}) + require.ErrorIs(t, err, io.EOF) + }() + } + + wg.Wait() + }) + } +} + func TestManyStreams(t *testing.T) { const streamCount = 128 for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(), @@ -283,11 +366,117 @@ func TestManyStreams(t *testing.T) { } } +// TestMoreStreamsThanOurLimits tests handling more streams than our and the +// peer's resource limits. It spawns 1024 Go routines that try to open a stream +// and send and receive data. If they encounter an error they'll try again after +// a sleep. If the transport is well behaved, eventually all Go routines will +// have sent and received a message. +func TestMoreStreamsThanOurLimits(t *testing.T) { + const streamCount = 1024 + for _, tc := range transportsToTest { + t.Run(tc.Name, func(t *testing.T) { + if strings.Contains(tc.Name, "mplex") { + t.Skip("fixme: mplex hangs on waiting for data.") + return + } + listener := tc.HostGenerator(t, TransportTestCaseOpts{}) + dialer := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer listener.Close() + defer dialer.Close() + + require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{ + ID: listener.ID(), + Addrs: listener.Addrs(), + })) + + var handledStreams atomic.Int32 + listener.SetStreamHandler("echo", func(s network.Stream) { + io.Copy(s, s) + s.Close() + handledStreams.Add(1) + }) + + wg := sync.WaitGroup{} + wg.Add(streamCount) + errCh := make(chan error, 1) + var completedStreams atomic.Int32 + for i := 0; i < streamCount; i++ { + go func() { + defer wg.Done() + defer completedStreams.Add(1) + + var s network.Stream + var err error + // maxRetries is an arbitrary retry amount if there's any error. + maxRetries := streamCount * 4 + shouldRetry := func(err error) bool { + maxRetries-- + if maxRetries == 0 || len(errCh) > 0 { + select { + case errCh <- errors.New("max retries exceeded"): + default: + } + return false + } + return true + } + + for { + s, err = dialer.NewStream(context.Background(), listener.ID(), "echo") + if err != nil { + if shouldRetry(err) { + time.Sleep(50 * time.Millisecond) + continue + } + } + err = func(s network.Stream) error { + defer s.Close() + _, err = s.Write([]byte("hello")) + if err != nil { + return err + } + + err = s.CloseWrite() + if err != nil { + return err + } + + b, err := io.ReadAll(s) + if err != nil { + return err + } + if !bytes.Equal(b, []byte("hello")) { + return errors.New("received data does not match sent data") + } + + return nil + }(s) + if err != nil { + if shouldRetry(err) { + time.Sleep(50 * time.Millisecond) + continue + } + } + return + } + }() + } + wg.Wait() + close(errCh) + + require.NoError(t, <-errCh) + require.Equal(t, streamCount, int(handledStreams.Load())) + }) + } +} + func TestListenerStreamResets(t *testing.T) { for _, tc := range transportsToTest { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(), @@ -315,6 +504,8 @@ func TestDialerStreamResets(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(), @@ -344,6 +535,8 @@ func TestStreamReadDeadline(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { h1 := tc.HostGenerator(t, TransportTestCaseOpts{}) h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true}) + defer h1.Close() + defer h2.Close() require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{ ID: h1.ID(),