Skip to content

Commit

Permalink
transport tests: many streams and lots of data (#2296)
Browse files Browse the repository at this point in the history
* Add test for lots of data and lots of streams

* Add some logs

* Close hosts

* Remove alloc measurement in many big pings

* Rename fake proto

* Skip windows and add comments

* fix typo

---------

Co-authored-by: Marten Seemann <[email protected]>
  • Loading branch information
MarcoPolo and marten-seemann authored Jun 15, 2023
1 parent e5676a5 commit 91f34d4
Showing 1 changed file with 195 additions and 2 deletions.
197 changes: 195 additions & 2 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -151,6 +155,8 @@ func TestPing(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Expand Down Expand Up @@ -178,20 +184,22 @@ func TestBigPing(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Addrs: h1.Addrs(),
}))

h1.SetStreamHandler("/BIG-ping/1.0.0", func(s network.Stream) {
h1.SetStreamHandler("/big-ping", func(s network.Stream) {
io.Copy(s, s)
s.Close()
})

errCh := make(chan error, 1)
allocs := testing.AllocsPerRun(10, func() {
s, err := h2.NewStream(context.Background(), h1.ID(), "/BIG-ping/1.0.0")
s, err := h2.NewStream(context.Background(), h1.ID(), "/big-ping")
require.NoError(t, err)
defer s.Close()

Expand Down Expand Up @@ -225,12 +233,87 @@ func TestBigPing(t *testing.T) {
}
}

// TestLotsOfDataManyStreams tests sending a lot of data on multiple streams.
func TestLotsOfDataManyStreams(t *testing.T) {
// Skip on windows because of https://github.com/libp2p/go-libp2p/issues/2341
if runtime.GOOS == "windows" {
t.Skip("Skipping on windows because of https://github.com/libp2p/go-libp2p/issues/2341")
}

// 64k buffer
const bufSize = 64 << 10
sendBuf := [bufSize]byte{}
const totalStreams = 512
const parallel = 8
// Total sends are > 20MiB
require.Greater(t, len(sendBuf)*totalStreams, 20<<20)
t.Log("Total sends:", len(sendBuf)*totalStreams)

// Fill with random bytes
_, err := rand.Read(sendBuf[:])
require.NoError(t, err)

for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()
start := time.Now()
defer func() {
t.Log("Total time:", time.Since(start))
}()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Addrs: h1.Addrs(),
}))

h1.SetStreamHandler("/big-ping", func(s network.Stream) {
io.Copy(s, s)
s.Close()
})

sem := make(chan struct{}, parallel)
var wg sync.WaitGroup
for i := 0; i < totalStreams; i++ {
wg.Add(1)
sem <- struct{}{}
go func() {
defer wg.Done()
recvBuf := [bufSize]byte{}
defer func() { <-sem }()

s, err := h2.NewStream(context.Background(), h1.ID(), "/big-ping")
require.NoError(t, err)
defer s.Close()

_, err = s.Write(sendBuf[:])
require.NoError(t, err)
s.CloseWrite()

_, err = io.ReadFull(s, recvBuf[:])
require.NoError(t, err)
require.Equal(t, sendBuf, recvBuf)

_, err = s.Read([]byte{0})
require.ErrorIs(t, err, io.EOF)
}()
}

wg.Wait()
})
}
}

func TestManyStreams(t *testing.T) {
const streamCount = 128
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Expand Down Expand Up @@ -283,11 +366,117 @@ func TestManyStreams(t *testing.T) {
}
}

// TestMoreStreamsThanOurLimits tests handling more streams than our and the
// peer's resource limits. It spawns 1024 Go routines that try to open a stream
// and send and receive data. If they encounter an error they'll try again after
// a sleep. If the transport is well behaved, eventually all Go routines will
// have sent and received a message.
func TestMoreStreamsThanOurLimits(t *testing.T) {
const streamCount = 1024
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "mplex") {
t.Skip("fixme: mplex hangs on waiting for data.")
return
}
listener := tc.HostGenerator(t, TransportTestCaseOpts{})
dialer := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer listener.Close()
defer dialer.Close()

require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{
ID: listener.ID(),
Addrs: listener.Addrs(),
}))

var handledStreams atomic.Int32
listener.SetStreamHandler("echo", func(s network.Stream) {
io.Copy(s, s)
s.Close()
handledStreams.Add(1)
})

wg := sync.WaitGroup{}
wg.Add(streamCount)
errCh := make(chan error, 1)
var completedStreams atomic.Int32
for i := 0; i < streamCount; i++ {
go func() {
defer wg.Done()
defer completedStreams.Add(1)

var s network.Stream
var err error
// maxRetries is an arbitrary retry amount if there's any error.
maxRetries := streamCount * 4
shouldRetry := func(err error) bool {
maxRetries--
if maxRetries == 0 || len(errCh) > 0 {
select {
case errCh <- errors.New("max retries exceeded"):
default:
}
return false
}
return true
}

for {
s, err = dialer.NewStream(context.Background(), listener.ID(), "echo")
if err != nil {
if shouldRetry(err) {
time.Sleep(50 * time.Millisecond)
continue
}
}
err = func(s network.Stream) error {
defer s.Close()
_, err = s.Write([]byte("hello"))
if err != nil {
return err
}

err = s.CloseWrite()
if err != nil {
return err
}

b, err := io.ReadAll(s)
if err != nil {
return err
}
if !bytes.Equal(b, []byte("hello")) {
return errors.New("received data does not match sent data")
}

return nil
}(s)
if err != nil {
if shouldRetry(err) {
time.Sleep(50 * time.Millisecond)
continue
}
}
return
}
}()
}
wg.Wait()
close(errCh)

require.NoError(t, <-errCh)
require.Equal(t, streamCount, int(handledStreams.Load()))
})
}
}

func TestListenerStreamResets(t *testing.T) {
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Expand Down Expand Up @@ -315,6 +504,8 @@ func TestDialerStreamResets(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Expand Down Expand Up @@ -344,6 +535,8 @@ func TestStreamReadDeadline(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
h1 := tc.HostGenerator(t, TransportTestCaseOpts{})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true})
defer h1.Close()
defer h2.Close()

require.NoError(t, h2.Connect(context.Background(), peer.AddrInfo{
ID: h1.ID(),
Expand Down

0 comments on commit 91f34d4

Please sign in to comment.