diff --git a/dot/network/inbound.go b/dot/network/inbound.go index 0af6e6c2ee..dce76de654 100644 --- a/dot/network/inbound.go +++ b/dot/network/inbound.go @@ -15,8 +15,9 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder s.streamManager.logNewStream(stream) peer := stream.Conn().RemotePeer() - msgBytes := s.bufPool.get() - defer s.bufPool.put(msgBytes) + buffer := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buffer) + msgBytes := *buffer for { n, err := readStream(stream, msgBytes[:]) diff --git a/dot/network/notifications.go b/dot/network/notifications.go index a3b0006557..c55b063186 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -427,11 +427,11 @@ func (s *Service) readHandshake(stream libp2pnetwork.Stream, decoder HandshakeDe hsC := make(chan *handshakeReader) go func() { - msgBytes := s.bufPool.get() - defer func() { - s.bufPool.put(msgBytes) - close(hsC) - }() + defer close(hsC) + + buffer := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buffer) + msgBytes := *buffer tot, err := readStream(stream, msgBytes[:]) if err != nil { diff --git a/dot/network/pool.go b/dot/network/pool.go deleted file mode 100644 index c517acf88d..0000000000 --- a/dot/network/pool.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package network - -// sizedBufferPool is a pool of buffers used for reading from streams -type sizedBufferPool struct { - c chan []byte -} - -func newSizedBufferPool(preAllocate, size int) (bp *sizedBufferPool) { - bufferCh := make(chan []byte, size) - - for i := 0; i < preAllocate; i++ { - buf := make([]byte, maxMessageSize) - bufferCh <- buf - } - - return &sizedBufferPool{ - c: bufferCh, - } -} - -// get gets a buffer from the sizedBufferPool, or creates a new one if none are -// available in the pool. Buffers have a pre-allocated capacity. -func (bp *sizedBufferPool) get() (b []byte) { - select { - case b = <-bp.c: - // reuse existing buffer - return b - default: - // create new buffer - return make([]byte, maxMessageSize) - } -} - -// put returns the given buffer to the sizedBufferPool. -func (bp *sizedBufferPool) put(b []byte) { - select { - case bp.c <- b: - default: // Discard the buffer if the pool is full. - } -} diff --git a/dot/network/pool_test.go b/dot/network/pool_test.go deleted file mode 100644 index c0c37a597b..0000000000 --- a/dot/network/pool_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2021 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package network - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func Benchmark_sizedBufferPool(b *testing.B) { - const preAllocate = 100 - const poolSize = 200 - sbp := newSizedBufferPool(preAllocate, poolSize) - - b.RunParallel(func(p *testing.PB) { - for p.Next() { - buffer := sbp.get() - buffer[0] = 1 - buffer[len(buffer)-1] = 1 - sbp.put(buffer) - } - }) -} - -// Before: 104853 11119 ns/op 65598 B/op 1 allocs/op -// Array ptr: 2742781 438.3 ns/op 2 B/op 0 allocs/op -// Slices: 2560960 463.8 ns/op 2 B/op 0 allocs/op -// Slice pointer: 2683528 460.8 ns/op 2 B/op 0 allocs/op - -func Test_sizedBufferPool(t *testing.T) { - t.Parallel() - - const preAlloc = 1 - const poolSize = 2 - const maxIndex = maxMessageSize - 1 - - pool := newSizedBufferPool(preAlloc, poolSize) - - first := pool.get() // pre-allocated one - first[maxIndex] = 1 - - second := pool.get() // new one - second[maxIndex] = 2 - - third := pool.get() // new one - third[maxIndex] = 3 - - fourth := pool.get() // new one - fourth[maxIndex] = 4 - - pool.put(fourth) - pool.put(third) - pool.put(second) // discarded - pool.put(first) // discarded - - b := pool.get() // fourth - assert.Equal(t, byte(4), b[maxIndex]) - - b = pool.get() // third - assert.Equal(t, byte(3), b[maxIndex]) -} - -func Test_sizedBufferPool_race(t *testing.T) { - t.Parallel() - - const preAlloc = 1 - const poolSize = 2 - - pool := newSizedBufferPool(preAlloc, poolSize) - - const parallelism = 4 - - readyWait := new(sync.WaitGroup) - readyWait.Add(parallelism) - - doneWait := new(sync.WaitGroup) - doneWait.Add(parallelism) - - // run for 50ms - ctxTimerStarted := make(chan struct{}) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - go func() { - const timeout = 50 * time.Millisecond - readyWait.Wait() - ctx, cancel = context.WithTimeout(ctx, timeout) - close(ctxTimerStarted) - }() - defer cancel() - - for i := 0; i < parallelism; i++ { - go func() { - defer doneWait.Done() - readyWait.Done() - readyWait.Wait() - <-ctxTimerStarted - - for ctx.Err() != nil { - // test relies on the -race detector - // to detect concurrent writes to the buffer. - b := pool.get() - b[0] = 1 - pool.put(b) - } - }() - } - - doneWait.Wait() -} diff --git a/dot/network/service.go b/dot/network/service.go index 363d8ba6ef..cc78982699 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -107,7 +107,7 @@ type Service struct { host *host mdns *mdns gossip *gossip - bufPool *sizedBufferPool + bufPool *sync.Pool streamManager *streamManager notificationsProtocols map[byte]*notificationsProtocol // map of sub-protocol msg ID to protocol info @@ -181,16 +181,12 @@ func NewService(cfg *Config) (*Service, error) { return nil, err } - // pre-allocate pool of buffers used to read from streams. - // initially allocate as many buffers as likely necessary which is the number of inbound streams we will have, - // which should equal the average number of peers times the number of notifications protocols, which is currently 3. - preAllocateInPool := cfg.MinPeers * 3 - poolSize := cfg.MaxPeers * 3 - if cfg.noPreAllocate { // testing - preAllocateInPool = 0 - poolSize = cfg.MinPeers * 3 + bufPool := &sync.Pool{ + New: func() interface{} { + b := make([]byte, maxMessageSize) + return &b + }, } - bufPool := newSizedBufferPool(preAllocateInPool, poolSize) network := &Service{ ctx: ctx,