Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - p2p: server: adjust deadline during long reads and writes #5463

Closed
wants to merge 9 commits into from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ configuration is as follows:
* [#5467](https://github.com/spacemeshos/go-spacemesh/pull/5467)
Fix a bug that could cause ATX sync to stall because of exhausted limit of concurrent requests for dependencies.
Fetching dependencies of an ATX is not limited anymore.
* [#5463](https://github.com/spacemeshos/go-spacemesh/pull/5463)
Adjust deadline during long reads and writes, reducing "i/o deadline exceeded" errors.

## Release v1.3.3

Expand Down
3 changes: 3 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Config struct {
BatchSize, QueueSize int
MaxRetriesForRequest int
RequestTimeout time.Duration `mapstructure:"request-timeout"`
RequestHardTimeout time.Duration `mapstructure:"request-hard-timeout"`
EnableServerMetrics bool `mapstructure:"servers-metrics"`
ServersConfig map[string]ServerConfig `mapstructure:"servers"`
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
Expand All @@ -127,6 +128,7 @@ func DefaultConfig() Config {
QueueSize: 20,
BatchSize: 10,
RequestTimeout: 25 * time.Second,
RequestHardTimeout: 5 * time.Minute,
MaxRetriesForRequest: 100,
ServersConfig: map[string]ServerConfig{
// serves 1 MB of data
Expand Down Expand Up @@ -287,6 +289,7 @@ func (f *Fetch) registerServer(
) {
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
}
if f.cfg.EnableServerMetrics {
Expand Down
4 changes: 3 additions & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func createFetch(tb testing.TB) *testFetch {
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
GetAtxsConcurrency: DefaultConfig().GetAtxsConcurrency,
}
Expand Down Expand Up @@ -335,7 +336,8 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
BatchTimeout: 2000 * time.Minute, // make sure we never hit the batch timeout
BatchSize: 3,
QueueSize: 1000,
RequestTimeout: time.Second * time.Duration(3),
RequestTimeout: 3 * time.Second,
RequestHardTimeout: 10 * time.Second,
MaxRetriesForRequest: 3,
}
p2pconf := p2p.DefaultConfig()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-msgio v0.3.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/multiformats/go-multiaddr v0.12.1
github.com/multiformats/go-varint v0.0.7
Expand Down Expand Up @@ -144,7 +145,6 @@ require (
github.com/libp2p/go-nat v0.2.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
Expand Down
5 changes: 3 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,9 +1320,10 @@ func getTestDefaultConfig(tb testing.TB) *config.Config {
cfg.DataDirParent = tmp
cfg.FileLock = filepath.Join(tmp, "LOCK")

cfg.FETCH.RequestTimeout = 10
cfg.FETCH.RequestTimeout = 10 * time.Second
cfg.FETCH.RequestHardTimeout = 20 * time.Second
cfg.FETCH.BatchSize = 5
cfg.FETCH.BatchTimeout = 5
cfg.FETCH.BatchTimeout = 5 * time.Second

cfg.Beacon = beacon.NodeSimUnitTestConfig()

Expand Down
130 changes: 130 additions & 0 deletions p2p/server/deadline_adjuster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package server

import (
"context"
"errors"
"fmt"
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-yamux/v4"
)

const (
deadlineAdjusterChunkSize = 4096
)

type deadlineAdjuster struct {
peerStream
desc string
timeout time.Duration
hardTimeout time.Duration
totalRead int
totalWritten int
start time.Time
clock clockwork.Clock
chunkSize int
nextAdjustRead int
nextAdjustWrite int
hardDeadline time.Time
}

func newDeadlineAdjuster(stream peerStream, desc string, timeout, hardTimeout time.Duration) *deadlineAdjuster {
return &deadlineAdjuster{
peerStream: stream,
desc: desc,
timeout: timeout,
hardTimeout: hardTimeout,
start: time.Now(),
clock: clockwork.NewRealClock(),
chunkSize: deadlineAdjusterChunkSize,
nextAdjustRead: -1,
nextAdjustWrite: -1,
}
}

func (dadj *deadlineAdjuster) augmentError(what string, err error) error {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, yamux.ErrTimeout) {
return err
}
now := dadj.clock.Now()
return fmt.Errorf("%s: %s after %v, %d bytes read, %d bytes written, timeout %v, hard timeout %v: %w",
dadj.desc,
what,
now.Sub(dadj.start),
dadj.totalRead,
dadj.totalWritten,
dadj.timeout,
dadj.hardTimeout,
err)
}
poszu marked this conversation as resolved.
Show resolved Hide resolved

func (dadj *deadlineAdjuster) adjust() error {
now := dadj.clock.Now()
if dadj.hardDeadline.IsZero() {
dadj.hardDeadline = now.Add(dadj.hardTimeout)
} else if !now.Before(dadj.hardDeadline) {
ivan4th marked this conversation as resolved.
Show resolved Hide resolved
// emulate yamux timeout error
return yamux.ErrTimeout
}
// Do not adjust the deadline too often
adj := false
if dadj.totalRead > dadj.nextAdjustRead {
dadj.nextAdjustRead = dadj.totalRead + dadj.chunkSize
adj = true
}
if dadj.totalWritten > dadj.nextAdjustWrite {
dadj.nextAdjustWrite = dadj.totalWritten + dadj.chunkSize
adj = true
}
if adj {
// We ignore the error returned by SetDeadline b/c the call
// doesn't work for mock hosts
deadline := now.Add(dadj.timeout)
if deadline.After(dadj.hardDeadline) {
_ = dadj.SetDeadline(dadj.hardDeadline)
} else {
_ = dadj.SetDeadline(deadline)
}
}

return nil
}

func (dadj *deadlineAdjuster) Read(p []byte) (n int, err error) {
var nCur int
for n < len(p) {
if err := dadj.adjust(); err != nil {
return n, dadj.augmentError("read", err)
}

Check warning on line 99 in p2p/server/deadline_adjuster.go

View check run for this annotation

Codecov / codecov/patch

p2p/server/deadline_adjuster.go#L98-L99

Added lines #L98 - L99 were not covered by tests
to := min(len(p), n+dadj.chunkSize)
nCur, err = dadj.peerStream.Read(p[n:to])
poszu marked this conversation as resolved.
Show resolved Hide resolved
n += nCur
dadj.totalRead += nCur
if err != nil {
return n, dadj.augmentError("read", err)
}
if n < to {
poszu marked this conversation as resolved.
Show resolved Hide resolved
// Short read, don't try to read more data
break
}
}
return n, nil
}

func (dadj *deadlineAdjuster) Write(p []byte) (n int, err error) {
var nCur int
for n < len(p) {
if err := dadj.adjust(); err != nil {
return n, dadj.augmentError("write", err)
}
to := min(len(p), n+dadj.chunkSize)
nCur, err = dadj.peerStream.Write(p[n:to])
n += nCur
dadj.totalWritten += nCur
if err != nil {
return n, dadj.augmentError("write", err)
}
poszu marked this conversation as resolved.
Show resolved Hide resolved
}
return n, nil
}
128 changes: 128 additions & 0 deletions p2p/server/deadline_adjuster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package server

import (
"testing"
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-yamux/v4"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/p2p/server/mocks"
)

func TestDeadlineAdjuster(t *testing.T) {
ctrl := gomock.NewController(t)
s := mocks.NewMockpeerStream(ctrl)
clock := clockwork.NewFakeClock()

readChunks := []string{"xy", "ABCD", "EF", "0123", "4567", "89"}
writeChunks := []string{"foo", "abcd", "efgh", "ijk", "bbbc"}
poszu marked this conversation as resolved.
Show resolved Hide resolved

start := clock.Now()
var deadlines []int
s.EXPECT().
SetDeadline(gomock.Any()).
DoAndReturn(func(dt time.Time) error {
d := dt.Sub(start)
require.Equal(t, d, d.Truncate(time.Second))
deadlines = append(deadlines, int(d/time.Second))
return nil
}).
AnyTimes()

var readCalls []any
for _, str := range readChunks {
chunk := []byte(str)
readCalls = append(readCalls, s.EXPECT().
Read(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(time.Second)
copy(b, []byte(chunk))
return len(chunk), nil
}))
}
readCalls = append(readCalls, s.EXPECT().
Read(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(10 * time.Second)
return 1, yamux.ErrTimeout
}))
gomock.InOrder(readCalls...)

var writeCalls []any
for _, str := range writeChunks {
chunk := []byte(str)
writeCalls = append(writeCalls, s.EXPECT().
Write(chunk).DoAndReturn(func([]byte) (int, error) {
clock.Advance(time.Second)
return len(chunk), nil
}))
}
for i := 0; i < 2; i++ {
writeCalls = append(writeCalls, s.EXPECT().
Write(gomock.Any()).
DoAndReturn(func(b []byte) (int, error) {
clock.Advance(10 * time.Second)
return 2, yamux.ErrTimeout
}))
}
gomock.InOrder(writeCalls...)

dadj := newDeadlineAdjuster(s, "test", 10*time.Second, 35*time.Second)
dadj.clock = clock
dadj.chunkSize = 4

b := make([]byte, 2)
n, err := dadj.Read(b)
require.NoError(t, err)
require.Equal(t, 2, n)
require.Equal(t, []byte("xy"), b)

b = make([]byte, 10)
n, err = dadj.Read(b) // short read
require.NoError(t, err)
require.Equal(t, 6, n)
require.Equal(t, []byte("ABCDEF"), b[:n])

b = make([]byte, 10)
n, err = dadj.Read(b)
require.NoError(t, err)
require.Equal(t, 10, n)
require.Equal(t, []byte("0123456789"), b)

n, err = dadj.Write([]byte("foo"))
require.NoError(t, err)
require.Equal(t, 3, n)

n, err = dadj.Write([]byte("abcdefghijk"))
require.NoError(t, err)
require.Equal(t, 11, n)

b = make([]byte, 2)
n, err = dadj.Read(b)
require.Equal(t, 1, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 14 bytes written, timeout 10s, hard timeout 35s")

n, err = dadj.Write([]byte("bbbcdef"))
require.Equal(t, 6, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 20 bytes written, timeout 10s, hard timeout 35s")

// this causes deadline to be set at the hard deadline
n, err = dadj.Write([]byte("dd"))
require.Equal(t, 2, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 22 bytes written, timeout 10s, hard timeout 35s")

// this write doesn't even start as we're past the hard deadline
require.Equal(t, 41*time.Second, clock.Now().Sub(start))
n, err = dadj.Write([]byte("ddd"))
require.Equal(t, 0, n)
require.ErrorIs(t, err, yamux.ErrTimeout)
require.ErrorContains(t, err, "19 bytes read, 22 bytes written, timeout 10s, hard timeout 35s")

require.Equal(t, []int{10, 12, 14, 16, 18, 20, 35}, deadlines)
}
25 changes: 25 additions & 0 deletions p2p/server/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package server

import (
"context"
"io"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go

// Host is a subset of libp2p Host interface that needs to be implemented to be usable with server.
type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
}

type peerStream interface {
io.ReadWriteCloser
SetDeadline(time.Time) error
}
Loading