From addbaa6a7656a14c2e5472376673bfe5b4c6e23b Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 5 Mar 2024 08:24:38 +0100 Subject: [PATCH 01/10] reworked atx sync that persists results incrementally --- config/presets/fastnet.go | 1 + fetch/mesh_data.go | 62 ++++- log/zap.go | 8 + node/node.go | 4 + p2p/server/server.go | 10 +- sql/atxsync/atxsync.go | 82 +++++++ sql/atxsync/atxsync_test.go | 82 +++++++ sql/migrations/local/0004_atx_sync.sql | 14 ++ syncer/atxsync/mocks/mocks.go | 226 +++++++++++++++++ syncer/atxsync/syncer.go | 328 +++++++++++++++++++++++++ syncer/atxsync/syncer_test.go | 228 +++++++++++++++++ syncer/interface.go | 5 +- syncer/mocks/mocks.go | 99 +++++--- syncer/syncer.go | 18 +- syncer/syncer_test.go | 41 ++-- 15 files changed, 1135 insertions(+), 73 deletions(-) create mode 100644 sql/atxsync/atxsync.go create mode 100644 sql/atxsync/atxsync_test.go create mode 100644 sql/migrations/local/0004_atx_sync.sql create mode 100644 syncer/atxsync/mocks/mocks.go create mode 100644 syncer/atxsync/syncer.go create mode 100644 syncer/atxsync/syncer_test.go diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index 3365513745..9ef40d844c 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -51,6 +51,7 @@ func fastnet() config.Config { conf.LayerDuration = 15 * time.Second conf.Sync.Interval = 5 * time.Second conf.Sync.GossipDuration = 10 * time.Second + conf.Sync.AtxSync.EpochInfoInterval = 20 * time.Second conf.LayersPerEpoch = 4 conf.RegossipAtxInterval = 30 * time.Second diff --git a/fetch/mesh_data.go b/fetch/mesh_data.go index 77c467fa20..0e82723b85 100644 --- a/fetch/mesh_data.go +++ b/fetch/mesh_data.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" - "sync/atomic" + "strings" + "sync" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/activation" @@ -66,8 +68,11 @@ func (f *Fetch) getHashes( pendingMetric := pendingHashReqs.WithLabelValues(string(hint)) pendingMetric.Add(float64(len(hashes))) - var eg errgroup.Group - var failed atomic.Uint64 + var ( + eg errgroup.Group + mu sync.Mutex + bfailure = BatchError{Errors: map[types.Hash32]error{}} + ) for i, hash := range hashes { if err := options.limiter.Acquire(ctx, 1); err != nil { pendingMetric.Add(float64(i - len(hashes))) @@ -97,12 +102,15 @@ func (f *Fetch) getHashes( options.limiter.Release(1) pendingMetric.Add(-1) if p.err != nil { - f.logger.Debug("failed to get hash", + f.logger.With().Debug("failed to get hash", log.String("hint", string(hint)), log.Stringer("hash", h), log.Err(p.err), ) - failed.Add(1) + + mu.Lock() + bfailure.Add(h, p.err) + mu.Unlock() } return nil } @@ -110,8 +118,8 @@ func (f *Fetch) getHashes( } eg.Wait() - if failed.Load() > 0 { - return fmt.Errorf("failed to fetch %d hashes out of %d", failed.Load(), len(hashes)) + if !bfailure.Empty() { + return &bfailure } return nil } @@ -313,3 +321,43 @@ func (f *Fetch) GetCert( } return nil, fmt.Errorf("failed to get cert %v/%s from %d peers: %w", lid, bid.String(), len(peers), ctx.Err()) } + +type BatchError struct { + Errors map[types.Hash32]error +} + +func (b *BatchError) Empty() bool { + return len(b.Errors) == 0 +} + +func (b *BatchError) Add(id types.Hash32, err error) { + if b.Errors == nil { + b.Errors = map[types.Hash32]error{} + } + b.Errors[id] = err +} + +func (b *BatchError) Error() string { + var builder strings.Builder + builder.WriteString("batch failure: ") + for hash, err := range b.Errors { + builder.WriteString(hash.ShortString()) + builder.WriteString("=") + builder.WriteString(err.Error()) + } + return builder.String() +} + +func (b *BatchError) MarshalLogObject(encoder log.ObjectEncoder) error { + encoder.AddArray("errors", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { + for hash, err := range b.Errors { + encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(encoder log.ObjectEncoder) error { + encoder.AddString("id", hash.ShortString()) + encoder.AddString("error", err.Error()) + return nil + })) + } + return nil + })) + return nil +} diff --git a/log/zap.go b/log/zap.go index 9d5fd7d008..c0a67419ea 100644 --- a/log/zap.go +++ b/log/zap.go @@ -198,6 +198,14 @@ func ZContext(ctx context.Context) zap.Field { return zap.Inline(&marshalledContext{Context: ctx}) } +func NiceZapError(err error) zap.Field { + var loggable ObjectMarshaller + if errors.As(err, &loggable) { + return zap.Inline(loggable) + } + return zap.Error(err) +} + func Any(key string, value any) Field { return Field(zap.Any(key, value)) } diff --git a/node/node.go b/node/node.go index 8897ea2ce9..657e86ff19 100644 --- a/node/node.go +++ b/node/node.go @@ -858,6 +858,10 @@ func (app *App) initServices(ctx context.Context) error { fetcher, patrol, app.certifier, + atxsync.New(fetcher, app.clock, app.db, app.localDB, + atxsync.WithConfig(app.Config.Sync.AtxSync), + atxsync.WithLogger(app.syncLogger.Zap()), + ), syncer.WithConfig(syncerConf), syncer.WithLogger(app.syncLogger), ) diff --git a/p2p/server/server.go b/p2p/server/server.go index 84cf1779a5..be90fb6553 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -28,8 +28,12 @@ type DecayingTagSpec struct { Cap int `mapstructure:"cap"` } -// ErrNotConnected is returned when peer is not connected. -var ErrNotConnected = errors.New("peer is not connected") +var ( + // ErrNotConnected is returned when peer is not connected. + ErrNotConnected = errors.New("peer is not connected") + // ErrPeerResponseFailed raised if peer responded with an error. + ErrPeerResponseFailed = errors.New("peer response failed") +) // Opt is a type to configure a server. type Opt func(s *Server) @@ -343,7 +347,7 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte) ([]byte, s.metrics.clientServerError.Inc() s.metrics.clientLatency.Observe(took) } - return nil, errors.New(data.Error) + return nil, fmt.Errorf("%w: %s", ErrPeerResponseFailed, data.Error) case s.metrics != nil: s.metrics.clientSucceeded.Inc() s.metrics.clientLatency.Observe(took) diff --git a/sql/atxsync/atxsync.go b/sql/atxsync/atxsync.go new file mode 100644 index 0000000000..baaa08b511 --- /dev/null +++ b/sql/atxsync/atxsync.go @@ -0,0 +1,82 @@ +package atxsync + +import ( + "fmt" + "time" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" +) + +func GetSyncState(db sql.Executor, epoch types.EpochID) (map[types.ATXID]int, error) { + states := map[types.ATXID]int{} + _, err := db.Exec("select id, requests from atx_sync_state where epoch = ?1", + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + }, func(stmt *sql.Statement) bool { + var id types.ATXID + stmt.ColumnBytes(0, id[:]) + states[id] = int(stmt.ColumnInt64(1)) + return true + }) + if err != nil { + return nil, fmt.Errorf("select synced atx ids for epoch failed %v: %w", epoch, err) + } + return states, nil +} + +func SaveSyncState(db sql.Executor, epoch types.EpochID, states map[types.ATXID]int, max int) error { + for id, requests := range states { + var err error + if requests == max { + _, err = db.Exec(`delete from atx_sync_state where epoch = ?1 and id = ?2`, func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindBytes(2, id[:]) + }, nil) + } else { + _, err = db.Exec(`insert into atx_sync_state + (epoch, id, requests) values (?1, ?2, ?3) + on conflict(epoch, id) do update set requests = ?3;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindBytes(2, id[:]) + stmt.BindInt64(3, int64(requests)) + }, nil) + } + if err != nil { + return fmt.Errorf("insert synced atx id %v/%v failed: %w", epoch, id.ShortString(), err) + } + } + return nil +} + +func SaveRequestTime(db sql.Executor, epoch types.EpochID, timestamp time.Time) error { + _, err := db.Exec(`insert into atx_sync_requests + (epoch, timestamp) values (?1, ?2) + on conflict(epoch) do update set timestamp = ?2;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindInt64(2, timestamp.Unix()) + }, nil) + if err != nil { + return fmt.Errorf("insert request time for epoch %v failed: %w", epoch, err) + } + return nil +} + +func GetRequestTime(db sql.Executor, epoch types.EpochID) (time.Time, error) { + var timestamp time.Time + rows, err := db.Exec("select timestamp from atx_sync_requests where epoch = ?1", + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + }, func(stmt *sql.Statement) bool { + timestamp = time.Unix(stmt.ColumnInt64(0), 0) + return true + }) + if err != nil { + return time.Time{}, fmt.Errorf("select request time for epoch %v failed: %w", epoch, err) + } else if rows == 0 { + return time.Time{}, fmt.Errorf("%w: no request time for epoch %v", sql.ErrNotFound, epoch) + } + return timestamp, nil +} diff --git a/sql/atxsync/atxsync_test.go b/sql/atxsync/atxsync_test.go new file mode 100644 index 0000000000..dbb5183ecd --- /dev/null +++ b/sql/atxsync/atxsync_test.go @@ -0,0 +1,82 @@ +package atxsync + +import ( + "encoding/binary" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/localsql" +) + +func TestSyncState(t *testing.T) { + db := localsql.InMemory() + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + state, err := GetSyncState(db, epoch) + require.NoError(t, err) + require.Empty(t, state) + + states := map[types.ATXID]int{} + const size = 100 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + states[id] = 0 + } + require.NoError(t, SaveSyncState(db, epoch, states, 1)) + + state, err = GetSyncState(db, epoch) + require.NoError(t, err) + + require.Len(t, state, size) + const max = 10 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + requests, exists := state[id] + require.True(t, exists) + require.Equal(t, 0, requests) + if i < size/2 { + state[id] = 1 + } else { + state[id] = max + } + } + + require.NoError(t, SaveSyncState(db, epoch, state, max)) + + updated, err := GetSyncState(db, epoch) + require.NoError(t, err) + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + if i >= size/2 { + delete(state, id) + } + } + require.Equal(t, state, updated) + + } +} + +func TestRequestTime(t *testing.T) { + db := localsql.InMemory() + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + timestamp, err := GetRequestTime(db, epoch) + require.ErrorIs(t, err, sql.ErrNotFound) + require.True(t, timestamp.IsZero()) + + for step := time.Duration(0); step < 10*time.Second; step += time.Second { + now := time.Now().Add(step) + require.NoError(t, SaveRequestTime(db, epoch, now)) + + timestamp, err = GetRequestTime(db, epoch) + require.NoError(t, err) + // now is truncated to a multiple of seconds, as we discard nanonesconds when saving request time + require.Equal(t, now.Truncate(time.Second), timestamp) + } + } +} diff --git a/sql/migrations/local/0004_atx_sync.sql b/sql/migrations/local/0004_atx_sync.sql new file mode 100644 index 0000000000..dfe8afcbed --- /dev/null +++ b/sql/migrations/local/0004_atx_sync.sql @@ -0,0 +1,14 @@ +CREATE TABLE atx_sync_state +( + epoch INT NOT NULL, + id CHAR(32) NOT NULL, + requests INT NOT NULL DEFAULT 0, + PRIMARY KEY (epoch, id) +) WITHOUT ROWID; + +CREATE TABLE atx_sync_requests +( + epoch INT NOT NULL, + timestamp INT NOT NULL, + PRIMARY KEY (epoch) +) WITHOUT ROWID; \ No newline at end of file diff --git a/syncer/atxsync/mocks/mocks.go b/syncer/atxsync/mocks/mocks.go new file mode 100644 index 0000000000..0f24359089 --- /dev/null +++ b/syncer/atxsync/mocks/mocks.go @@ -0,0 +1,226 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./syncer.go +// +// Generated by this command: +// +// mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./syncer.go +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + types "github.com/spacemeshos/go-spacemesh/common/types" + fetch "github.com/spacemeshos/go-spacemesh/fetch" + p2p "github.com/spacemeshos/go-spacemesh/p2p" + system "github.com/spacemeshos/go-spacemesh/system" + gomock "go.uber.org/mock/gomock" +) + +// Mockclock is a mock of clock interface. +type Mockclock struct { + ctrl *gomock.Controller + recorder *MockclockMockRecorder +} + +// MockclockMockRecorder is the mock recorder for Mockclock. +type MockclockMockRecorder struct { + mock *Mockclock +} + +// NewMockclock creates a new mock instance. +func NewMockclock(ctrl *gomock.Controller) *Mockclock { + mock := &Mockclock{ctrl: ctrl} + mock.recorder = &MockclockMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mockclock) EXPECT() *MockclockMockRecorder { + return m.recorder +} + +// LayerToTime mocks base method. +func (m *Mockclock) LayerToTime(arg0 types.LayerID) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LayerToTime", arg0) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// LayerToTime indicates an expected call of LayerToTime. +func (mr *MockclockMockRecorder) LayerToTime(arg0 any) *MockclockLayerToTimeCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LayerToTime", reflect.TypeOf((*Mockclock)(nil).LayerToTime), arg0) + return &MockclockLayerToTimeCall{Call: call} +} + +// MockclockLayerToTimeCall wrap *gomock.Call +type MockclockLayerToTimeCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockclockLayerToTimeCall) Return(arg0 time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockclockLayerToTimeCall) Do(f func(types.LayerID) time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockclockLayerToTimeCall) DoAndReturn(f func(types.LayerID) time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Mockfetcher is a mock of fetcher interface. +type Mockfetcher struct { + ctrl *gomock.Controller + recorder *MockfetcherMockRecorder +} + +// MockfetcherMockRecorder is the mock recorder for Mockfetcher. +type MockfetcherMockRecorder struct { + mock *Mockfetcher +} + +// NewMockfetcher creates a new mock instance. +func NewMockfetcher(ctrl *gomock.Controller) *Mockfetcher { + mock := &Mockfetcher{ctrl: ctrl} + mock.recorder = &MockfetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mockfetcher) EXPECT() *MockfetcherMockRecorder { + return m.recorder +} + +// GetAtxs mocks base method. +func (m *Mockfetcher) GetAtxs(arg0 context.Context, arg1 []types.ATXID, arg2 ...system.GetAtxOpt) error { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetAtxs", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAtxs indicates an expected call of GetAtxs. +func (mr *MockfetcherMockRecorder) GetAtxs(arg0, arg1 any, arg2 ...any) *MockfetcherGetAtxsCall { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAtxs", reflect.TypeOf((*Mockfetcher)(nil).GetAtxs), varargs...) + return &MockfetcherGetAtxsCall{Call: call} +} + +// MockfetcherGetAtxsCall wrap *gomock.Call +type MockfetcherGetAtxsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherGetAtxsCall) Return(arg0 error) *MockfetcherGetAtxsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherGetAtxsCall) Do(f func(context.Context, []types.ATXID, ...system.GetAtxOpt) error) *MockfetcherGetAtxsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATXID, ...system.GetAtxOpt) error) *MockfetcherGetAtxsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// PeerEpochInfo mocks base method. +func (m *Mockfetcher) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeerEpochInfo", arg0, arg1, arg2) + ret0, _ := ret[0].(*fetch.EpochData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerEpochInfo indicates an expected call of PeerEpochInfo. +func (mr *MockfetcherMockRecorder) PeerEpochInfo(arg0, arg1, arg2 any) *MockfetcherPeerEpochInfoCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerEpochInfo", reflect.TypeOf((*Mockfetcher)(nil).PeerEpochInfo), arg0, arg1, arg2) + return &MockfetcherPeerEpochInfoCall{Call: call} +} + +// MockfetcherPeerEpochInfoCall wrap *gomock.Call +type MockfetcherPeerEpochInfoCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherPeerEpochInfoCall) Return(arg0 *fetch.EpochData, arg1 error) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherPeerEpochInfoCall) Do(f func(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherPeerEpochInfoCall) DoAndReturn(f func(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// SelectBestShuffled mocks base method. +func (m *Mockfetcher) SelectBestShuffled(arg0 int) []p2p.Peer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SelectBestShuffled", arg0) + ret0, _ := ret[0].([]p2p.Peer) + return ret0 +} + +// SelectBestShuffled indicates an expected call of SelectBestShuffled. +func (mr *MockfetcherMockRecorder) SelectBestShuffled(arg0 any) *MockfetcherSelectBestShuffledCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBestShuffled", reflect.TypeOf((*Mockfetcher)(nil).SelectBestShuffled), arg0) + return &MockfetcherSelectBestShuffledCall{Call: call} +} + +// MockfetcherSelectBestShuffledCall wrap *gomock.Call +type MockfetcherSelectBestShuffledCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherSelectBestShuffledCall) Return(arg0 []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherSelectBestShuffledCall) Do(f func(int) []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go new file mode 100644 index 0000000000..50c2fd50bb --- /dev/null +++ b/syncer/atxsync/syncer.go @@ -0,0 +1,328 @@ +package atxsync + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/log" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/server" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" + "github.com/spacemeshos/go-spacemesh/sql/localsql" + "github.com/spacemeshos/go-spacemesh/system" +) + +//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./syncer.go + +type clock interface { + LayerToTime(types.LayerID) time.Time +} + +type fetcher interface { + SelectBestShuffled(int) []p2p.Peer + PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error) + system.AtxFetcher +} + +type Opt func(*Syncer) + +func WithLogger(logger *zap.Logger) Opt { + return func(s *Syncer) { + s.logger = logger + } +} + +func DefaultConfig() Config { + return Config{ + EpochInfoInterval: 30 * time.Minute, + AtxsBatch: 1000, + RequestsLimit: 20, + EpochInfoPeers: 2, + } +} + +type Config struct { + // EpochInfoInterval between epoch info requests to the network. + EpochInfoInterval time.Duration `mapstructure:"epoch-info-request-interval"` + // EpochInfoPeers is the number of peers we will ask for epoch info, every epoch info requests interval. + EpochInfoPeers int `mapstructure:"epoch-info-peers"` + + // RequestsLimit is the maximum number of requests for single activation. + // + // The purpose of it is to prevent peers from advertising invalid atx and disappearing. + // Which will make node ask other peers for invalid atx. + // It will be reset to 0 once atx advertised again. + RequestsLimit int `mapstructure:"requests-limit"` + + // AtxsBatch is the maximum number of atxs to sync in a single request. + AtxsBatch int `mapstructure:"atxs-batch"` +} + +func WithConfig(cfg Config) Opt { + return func(s *Syncer) { + s.cfg = cfg + } +} + +func New(fetcher fetcher, clock clock, db sql.Executor, localdb *localsql.Database, opts ...Opt) *Syncer { + s := &Syncer{ + logger: zap.NewNop(), + cfg: DefaultConfig(), + fetcher: fetcher, + clock: clock, + db: db, + localdb: localdb, + } + for _, opt := range opts { + opt(s) + } + return s +} + +type Syncer struct { + logger *zap.Logger + cfg Config + fetcher fetcher + clock clock + db sql.Executor + localdb *localsql.Database +} + +func (s *Syncer) closeToTheEpoch(epoch types.EpochID, timestamp time.Time) bool { + epochStart := s.clock.LayerToTime(epoch.FirstLayer()) + return timestamp.After(epochStart) || epochStart.Sub(timestamp) < 2*s.cfg.EpochInfoInterval +} + +func (s *Syncer) Download(ctx context.Context, epoch types.EpochID) error { + s.logger.Info("starting atx sync", log.ZContext(ctx), epoch.Field().Zap()) + + state, err := atxsync.GetSyncState(s.localdb, epoch) + if err != nil { + return fmt.Errorf("failed to get state for epoch %v: %w", epoch, err) + } + lastSuccess, err := atxsync.GetRequestTime(s.localdb, epoch) + if err != nil && !errors.Is(err, sql.ErrNotFound) { + return fmt.Errorf("failed to get last request time for epoch %v: %w", epoch, err) + } + // in case of immediate we will request epoch info without waiting EpochInfoInterval + immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !s.closeToTheEpoch(epoch, lastSuccess)) + + ctx, cancel := context.WithCancel(ctx) + eg, ctx := errgroup.WithContext(ctx) + updates := make(chan epochUpdate, s.cfg.EpochInfoPeers) + if len(state) == 0 { + state = map[types.ATXID]int{} + } else { + updates <- epochUpdate{time: lastSuccess, update: state} + } + // termination requires two conditions: + // - epoch info has to be successfully downloaded close to or after the epoch start + // - all atxs from that epoch have to be downloaded or they are unavailable. + // atx is unavailable if it was requested more than RequestsLimit times, and no peer provided it. + eg.Go(func() error { + return s.downloadEpochInfo(ctx, epoch, immediate, updates) + }) + eg.Go(func() error { + err := s.downloadAtxs(ctx, epoch, state, updates) + cancel() + return err + }) + return eg.Wait() +} + +func (s *Syncer) downloadEpochInfo( + ctx context.Context, + epoch types.EpochID, + immediate bool, + updates chan epochUpdate, +) error { + interval := s.cfg.EpochInfoInterval + if immediate { + interval = 0 + } + for { + select { + case <-ctx.Done(): + return nil + // TODO(dshulyak) this has to be randomized in a followup + // when sync will be schedulled in advance, in order to smooth out request rate across the network + case <-time.After(interval): + } + + peers := s.fetcher.SelectBestShuffled(s.cfg.EpochInfoPeers) + if len(peers) == 0 { + return fmt.Errorf("no peers available") + } + // do not run it concurrently, epoch info is large and will continue to grow + for _, peer := range peers { + epochData, err := s.fetcher.PeerEpochInfo(ctx, peer, epoch) + if err != nil || epochData == nil { + if errors.Is(err, context.Canceled) { + return nil + } + s.logger.Warn("failed to download epoch info", + log.ZContext(ctx), + epoch.Field().Zap(), + zap.String("peer", peer.String()), + zap.Error(err), + ) + continue + } + s.logger.Info("downloaded epoch info", + log.ZContext(ctx), + epoch.Field().Zap(), + zap.String("peer", peer.String()), + zap.Int("atxs", len(epochData.AtxIDs)), + ) + // adding hashes to fetcher is not useful as they overflow the cache and are not used + // so we switch to asking best peers immediately + update := make(map[types.ATXID]int, len(epochData.AtxIDs)) + for _, atx := range epochData.AtxIDs { + update[atx] = 0 + } + select { + case <-ctx.Done(): + return nil + case updates <- epochUpdate{time: time.Now(), update: update}: + } + // after first success switch to requests after interval + interval = s.cfg.EpochInfoInterval + } + } +} + +func (s *Syncer) downloadAtxs( + ctx context.Context, + epoch types.EpochID, + state map[types.ATXID]int, + updates chan epochUpdate, +) error { + batch := make([]types.ATXID, 0, s.cfg.AtxsBatch) + batch = batch[:0] + var ( + downloaded = map[types.ATXID]bool{} + previouslyDownloaded = 0 + start = time.Now() + lastSuccess time.Time + progressTimestamp = start + nothingToDownload = len(state) == 0 + ) + + for { + // waiting for update if there is nothing to download + if nothingToDownload && s.closeToTheEpoch(epoch, lastSuccess) { + s.logger.Info( + "atx sync terminated", + log.ZContext(ctx), + epoch.Field().Zap(), + zap.Int("downloaded", len(downloaded)), + zap.Int("total", len(state)), + zap.Int("unavailable", len(state)-len(downloaded)), + zap.Duration("duration", time.Since(start)), + ) + return nil + } + if nothingToDownload { + select { + case <-ctx.Done(): + return nil + case update := <-updates: + lastSuccess = update.time + for atx, count := range update.update { + state[atx] = count + } + } + } else { + // otherwise check updates periodically but don't stop downloading + select { + case <-ctx.Done(): + return nil + case update := <-updates: + lastSuccess = update.time + for atx, count := range update.update { + state[atx] = count + } + default: + } + } + + for atx, requests := range state { + if downloaded[atx] { + continue + } + exists, err := atxs.Has(s.db, atx) + if err != nil { + return err + } + if exists { + downloaded[atx] = true + continue + } + if requests >= s.cfg.RequestsLimit { + delete(state, atx) + continue + } + batch = append(batch, atx) + if len(batch) == cap(batch) { + break + } + } + nothingToDownload = len(batch) == 0 + + // report progress every 10% or every 20 minutes + if progress := float64(len(downloaded) - previouslyDownloaded); progress/float64(len(state)) > 0.1 || + time.Since(progressTimestamp) > 20*time.Minute { + s.logger.Info( + "atx sync progress", + log.ZContext(ctx), + epoch.Field().Zap(), + zap.Int("downloaded", len(downloaded)), + zap.Int("total", len(state)), + zap.Int("progress", int(progress)), + zap.Float64("rate per sec", progress/time.Since(progressTimestamp).Seconds()), + ) + previouslyDownloaded = len(downloaded) + progressTimestamp = time.Now() + } + if len(batch) > 0 { + if err := s.fetcher.GetAtxs(ctx, batch); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + s.logger.Debug("failed to download atxs", log.ZContext(ctx), log.NiceZapError(err)) + batchError := &fetch.BatchError{} + if errors.As(err, &batchError) { + for hash, err := range batchError.Errors { + if errors.Is(err, server.ErrPeerResponseFailed) { + state[types.ATXID(hash)]++ + } + } + } + } + } + + if err := s.localdb.WithTx(ctx, func(tx *sql.Tx) error { + if err := atxsync.SaveRequestTime(tx, epoch, lastSuccess); err != nil { + return fmt.Errorf("failed to save request time: %w", err) + } + return atxsync.SaveSyncState(tx, epoch, state, s.cfg.RequestsLimit) + }); err != nil { + return fmt.Errorf("failed to persist state for epoch %v: %w", epoch, err) + } + batch = batch[:0] + } +} + +type epochUpdate struct { + time time.Time + update map[types.ATXID]int +} diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go new file mode 100644 index 0000000000..1f3a4a4440 --- /dev/null +++ b/syncer/atxsync/syncer_test.go @@ -0,0 +1,228 @@ +package atxsync + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/server" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" + "github.com/spacemeshos/go-spacemesh/sql/localsql" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks" + "github.com/spacemeshos/go-spacemesh/system" +) + +func init() { + types.SetLayersPerEpoch(10) +} + +func aid(id string) types.ATXID { + var atx types.ATXID + copy(atx[:], id) + return atx +} + +func edata(ids ...string) *fetch.EpochData { + ed := &fetch.EpochData{} + for _, id := range ids { + ed.AtxIDs = append(ed.AtxIDs, aid(id)) + } + return ed +} + +func newTester(tb testing.TB, cfg Config) *tester { + localdb := localsql.InMemory() + db := sql.InMemory() + ctrl := gomock.NewController(tb) + clock := mocks.NewMockclock(ctrl) + fetcher := mocks.NewMockfetcher(ctrl) + syncer := New(fetcher, clock, db, localdb, WithConfig(cfg), WithLogger(logtest.New(tb).Zap())) + return &tester{ + tb: tb, + syncer: syncer, + localdb: localdb, + db: db, + cfg: cfg, + ctrl: ctrl, + clock: clock, + fetcher: fetcher, + } +} + +type tester struct { + tb testing.TB + syncer *Syncer + localdb *localsql.Database + db *sql.Database + cfg Config + ctrl *gomock.Controller + clock *mocks.Mockclock + fetcher *mocks.Mockfetcher +} + +func TestSyncer(t *testing.T) { + t.Run("sanity", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 100 * time.Microsecond, + EpochInfoPeers: 3, + RequestsLimit: 10, + AtxsBatch: 10, + }) + + peers := []p2p.Peer{"a", "b", "c"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(1) + for _, p := range peers { + tester.fetcher.EXPECT(). + PeerEpochInfo(gomock.Any(), p, publish). + Return(edata("4", "1", "3", "2"), nil). + AnyTimes() + } + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + for _, id := range ids { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + return nil + }).AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) + t.Run("interruptible", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + publish := types.EpochID(1) + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(time.Now()).AnyTimes() + require.NoError(t, tester.syncer.Download(ctx, publish)) + }) + t.Run("error on no peers", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(1) + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(time.Now()).AnyTimes() + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(nil) + require.ErrorContains(t, tester.syncer.Download(context.Background(), publish), "no peers available") + }) + t.Run("terminate without queries if sync on time", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(2) + now := time.Now() + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(now).AnyTimes() + + state := map[types.ATXID]int{ + aid("1"): 0, + aid("2"): 0, + } + require.NoError(t, atxsync.SaveSyncState(tester.localdb, publish, state, tester.cfg.AtxsBatch)) + lastSuccess := now.Add(-tester.cfg.EpochInfoInterval) + require.NoError(t, atxsync.SaveRequestTime(tester.localdb, publish, lastSuccess)) + for id := range state { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) + t.Run("immediate epoch info retries", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 10 * time.Second, + EpochInfoPeers: 3, + RequestsLimit: 10, + AtxsBatch: 10, + }) + ctx, cancel := context.WithTimeout(context.Background(), tester.cfg.EpochInfoInterval/2) + defer cancel() + + peers := []p2p.Peer{"a"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(2) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(nil, errors.New("bad try")) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(edata("1", "2", "3"), nil) + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + for _, id := range ids { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + return nil + }).AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(ctx, publish)) + }) + t.Run("give up on atx after max retries", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 200 * time.Hour, + EpochInfoPeers: 2, + RequestsLimit: 10, + AtxsBatch: 2, + }) + + peers := []p2p.Peer{"a", "b"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(2) + good := edata("1", "2", "3") + bad := edata("4", "5", "6") + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(good, nil) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[1], publish).Return(bad, nil) + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + require.LessOrEqual(t, len(ids), tester.cfg.AtxsBatch) + berr := fetch.BatchError{} + for _, id := range ids { + for _, good := range good.AtxIDs { + if good == id { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + } + for _, bad := range bad.AtxIDs { + if bad == id { + berr.Add(bad.Hash32(), fmt.Errorf("%w: test", server.ErrPeerResponseFailed)) + } + } + } + if berr.Empty() { + return nil + } + return &berr + }). + AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + + state, err := atxsync.GetSyncState(tester.localdb, publish) + require.NoError(t, err) + for _, bad := range bad.AtxIDs { + require.NotContains(t, state, bad) + } + }) + t.Run("terminate empty epoch", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(2) + now := time.Now() + tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(now).AnyTimes() + peers := []p2p.Peer{"a"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(edata(), nil) + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) +} diff --git a/syncer/interface.go b/syncer/interface.go index 62c0b4de53..af57b4ea47 100644 --- a/syncer/interface.go +++ b/syncer/interface.go @@ -33,7 +33,10 @@ type fetchLogic interface { bool, []p2p.Peer, ) ([]*fetch.LayerOpinion, []*types.Certificate, error) - GetEpochATXs(context.Context, types.EpochID) error +} + +type atxSyncer interface { + Download(context.Context, types.EpochID) error } // fetcher is the interface to the low-level fetching. diff --git a/syncer/mocks/mocks.go b/syncer/mocks/mocks.go index 69deec4381..a0927f3c8c 100644 --- a/syncer/mocks/mocks.go +++ b/syncer/mocks/mocks.go @@ -322,44 +322,6 @@ func (c *MockfetchLogicGetCertCall) DoAndReturn(f func(context.Context, types.La return c } -// GetEpochATXs mocks base method. -func (m *MockfetchLogic) GetEpochATXs(arg0 context.Context, arg1 types.EpochID) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetEpochATXs", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// GetEpochATXs indicates an expected call of GetEpochATXs. -func (mr *MockfetchLogicMockRecorder) GetEpochATXs(arg0, arg1 any) *MockfetchLogicGetEpochATXsCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEpochATXs", reflect.TypeOf((*MockfetchLogic)(nil).GetEpochATXs), arg0, arg1) - return &MockfetchLogicGetEpochATXsCall{Call: call} -} - -// MockfetchLogicGetEpochATXsCall wrap *gomock.Call -type MockfetchLogicGetEpochATXsCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *MockfetchLogicGetEpochATXsCall) Return(arg0 error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *MockfetchLogicGetEpochATXsCall) Do(f func(context.Context, types.EpochID) error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockfetchLogicGetEpochATXsCall) DoAndReturn(f func(context.Context, types.EpochID) error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // GetLayerData mocks base method. func (m *MockfetchLogic) GetLayerData(arg0 context.Context, arg1 p2p.Peer, arg2 types.LayerID) ([]byte, error) { m.ctrl.T.Helper() @@ -788,6 +750,67 @@ func (c *MockfetchLogicSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Pee return c } +// MockatxSyncer is a mock of atxSyncer interface. +type MockatxSyncer struct { + ctrl *gomock.Controller + recorder *MockatxSyncerMockRecorder +} + +// MockatxSyncerMockRecorder is the mock recorder for MockatxSyncer. +type MockatxSyncerMockRecorder struct { + mock *MockatxSyncer +} + +// NewMockatxSyncer creates a new mock instance. +func NewMockatxSyncer(ctrl *gomock.Controller) *MockatxSyncer { + mock := &MockatxSyncer{ctrl: ctrl} + mock.recorder = &MockatxSyncerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockatxSyncer) EXPECT() *MockatxSyncerMockRecorder { + return m.recorder +} + +// Download mocks base method. +func (m *MockatxSyncer) Download(arg0 context.Context, arg1 types.EpochID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Download", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Download indicates an expected call of Download. +func (mr *MockatxSyncerMockRecorder) Download(arg0, arg1 any) *MockatxSyncerDownloadCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockatxSyncer)(nil).Download), arg0, arg1) + return &MockatxSyncerDownloadCall{Call: call} +} + +// MockatxSyncerDownloadCall wrap *gomock.Call +type MockatxSyncerDownloadCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockatxSyncerDownloadCall) Return(arg0 error) *MockatxSyncerDownloadCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockatxSyncerDownloadCall) Do(f func(context.Context, types.EpochID) error) *MockatxSyncerDownloadCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockatxSyncerDownloadCall) DoAndReturn(f func(context.Context, types.EpochID) error) *MockatxSyncerDownloadCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Mockfetcher is a mock of fetcher interface. type Mockfetcher struct { ctrl *gomock.Controller diff --git a/syncer/syncer.go b/syncer/syncer.go index 4513520949..e11265c719 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,6 +16,7 @@ import ( "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh" "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" "github.com/spacemeshos/go-spacemesh/system" ) @@ -27,10 +28,11 @@ type Config struct { SyncCertDistance uint32 MaxStaleDuration time.Duration `mapstructure:"maxstaleduration"` Standalone bool - GossipDuration time.Duration `mapstructure:"gossipduration"` - DisableMeshAgreement bool `mapstructure:"disable-mesh-agreement"` - DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` - OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` + GossipDuration time.Duration `mapstructure:"gossipduration"` + DisableMeshAgreement bool `mapstructure:"disable-mesh-agreement"` + DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` + OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` + AtxSync atxsync.Config `mapstructure:"atx-sync"` } // DefaultConfig for the syncer. @@ -43,6 +45,7 @@ func DefaultConfig() Config { MaxStaleDuration: time.Second, GossipDuration: 15 * time.Second, OutOfSyncThresholdLayers: 3, + AtxSync: atxsync.DefaultConfig(), } } @@ -114,6 +117,7 @@ type Syncer struct { cfg Config cdb *datastore.CachedDB + atxsyncer atxSyncer ticker layerTicker beacon system.BeaconGetter mesh *mesh.Mesh @@ -149,12 +153,14 @@ func NewSyncer( fetcher fetcher, patrol layerPatrol, ch certHandler, + atxSyncer atxSyncer, opts ...Option, ) *Syncer { s := &Syncer{ logger: log.NewNop(), cfg: DefaultConfig(), cdb: cdb, + atxsyncer: atxSyncer, ticker: ticker, beacon: beacon, mesh: mesh, @@ -443,7 +449,7 @@ func (s *Syncer) syncAtx(ctx context.Context) error { } } if s.cfg.DisableAtxReconciliation { - s.logger.Debug("atx sync disabled") + s.logger.Debug("atx sync reconciliation is disabled") return nil } // steady state atx syncing @@ -563,7 +569,7 @@ func (s *Syncer) syncLayer(ctx context.Context, layerID types.LayerID, peers ... // fetching ATXs published the specified epoch. func (s *Syncer) fetchATXsForEpoch(ctx context.Context, epoch types.EpochID) error { - if err := s.dataFetcher.GetEpochATXs(ctx, epoch); err != nil { + if err := s.atxsyncer.Download(ctx, epoch); err != nil { return err } s.setLastAtxEpoch(epoch) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7896065ad2..ca304a8da7 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -65,6 +65,7 @@ type testSyncer struct { mTicker *mockLayerTicker mDataFetcher *mocks.MockfetchLogic + mAtxSyncer *mocks.MockatxSyncer mBeacon *smocks.MockBeaconGetter mLyrPatrol *mocks.MocklayerPatrol mVm *mmocks.MockvmState @@ -78,9 +79,11 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { lg := logtest.New(t) mt := newMockLayerTicker() ctrl := gomock.NewController(t) + ts := &testSyncer{ mTicker: mt, mDataFetcher: mocks.NewMockfetchLogic(ctrl), + mAtxSyncer: mocks.NewMockatxSyncer(ctrl), mBeacon: smocks.NewMockBeaconGetter(ctrl), mLyrPatrol: mocks.NewMocklayerPatrol(ctrl), mVm: mmocks.NewMockvmState(ctrl), @@ -114,6 +117,7 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { nil, ts.mLyrPatrol, ts.mCertHdr, + ts.mAtxSyncer, WithConfig(cfg), WithLogger(lg), withDataFetcher(ts.mDataFetcher), @@ -165,7 +169,7 @@ func TestSynchronize_OnlyOneSynchronize(t *testing.T) { defer cancel() ts.syncer.Start() - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) gLayer := types.GetEffectiveGenesis() @@ -223,7 +227,7 @@ func TestSynchronize_AllGood(t *testing.T) { current := gLayer.Add(10) ts.mTicker.advanceToLayer(current) for epoch := gLayer.GetEpoch(); epoch <= current.GetEpoch(); epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) for lid := gLayer.Add(1); lid.Before(current); lid = lid.Add(1) { @@ -276,8 +280,8 @@ func TestSynchronize_FetchLayerDataFailed(t *testing.T) { current := gLayer.Add(2) ts.mTicker.advanceToLayer(current) lyr := current.Sub(1) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()+1) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()+1) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("meh")) @@ -296,7 +300,7 @@ func TestSynchronize_FetchMalfeasanceFailed(t *testing.T) { current := gLayer.Add(2) ts.mTicker.advanceToLayer(current) lyr := current.Sub(1) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()).Return(errors.New("meh")) require.False(t, ts.syncer.synchronize(context.Background())) @@ -310,10 +314,10 @@ func TestSynchronize_FailedInitialATXsSync(t *testing.T) { current := types.LayerID(layersPerEpoch * uint32(failedEpoch+1)) ts.mTicker.advanceToLayer(current) for epoch := types.GetEffectiveGenesis().GetEpoch(); epoch < failedEpoch; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } - ts.mDataFetcher.EXPECT(). - GetEpochATXs(gomock.Any(), failedEpoch). + ts.mAtxSyncer.EXPECT(). + Download(gomock.Any(), failedEpoch). Return(errors.New("no ATXs. should fail sync")) var wg sync.WaitGroup @@ -355,7 +359,7 @@ func startWithSyncedState(t *testing.T, ts *testSyncer) types.LayerID { gLayer := types.GetEffectiveGenesis() ts.mTicker.advanceToLayer(gLayer) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()) require.True(t, ts.syncer.synchronize(context.Background())) require.True(t, ts.syncer.ListenToATXGossip()) require.True(t, ts.syncer.ListenToGossip()) @@ -397,7 +401,7 @@ func TestSyncAtxs_Genesis(t *testing.T) { if tc.lastSynced > 0 { require.False(t, ts.syncer.ListenToATXGossip()) for epoch := types.EpochID(1); epoch <= tc.lastSynced; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) } @@ -435,7 +439,7 @@ func TestSyncAtxs(t *testing.T) { require.Equal(t, lyr.GetEpoch()-1, ts.syncer.lastAtxEpoch()) ts.mTicker.advanceToLayer(tc.current) for epoch := lyr.GetEpoch(); epoch <= tc.lastSyncEpoch; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } for lid := lyr; lid < tc.current; lid++ { ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid) @@ -451,7 +455,7 @@ func TestSynchronize_StaySyncedUponFailure(t *testing.T) { lyr := startWithSyncedState(t, ts) current := lyr.Add(1) ts.mTicker.advanceToLayer(current) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), current.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), current.GetEpoch()) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("doh")) require.False(t, ts.syncer.synchronize(context.Background())) @@ -478,7 +482,7 @@ func TestSynchronize_BecomeNotSyncedUponFailureIfNoGossip(t *testing.T) { // test the case where the node originally starts from notSynced and eventually becomes synced. func TestFromNotSyncedToSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(5) @@ -512,7 +516,7 @@ func TestFromNotSyncedToSynced(t *testing.T) { // to notSynced. func TestFromGossipSyncToNotSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(1) ts.mTicker.advanceToLayer(current) @@ -546,7 +550,7 @@ func TestNetworkHasNoData(t *testing.T) { lyr := startWithSyncedState(t, ts) require.True(t, ts.syncer.IsSynced(context.Background())) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() for lid := lyr.Add(1); lid < lyr.Add(outOfSyncThreshold+1); lid++ { ts.mTicker.advanceToLayer(lid) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), gomock.Any()) @@ -568,7 +572,7 @@ func TestNetworkHasNoData(t *testing.T) { // eventually become synced again. func TestFromSyncedToNotSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()).AnyTimes() require.True(t, ts.syncer.synchronize(context.Background())) @@ -619,7 +623,7 @@ func waitOutGossipSync(t *testing.T, ts *testSyncer) { func TestSync_AlsoSyncProcessedLayer(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(1) @@ -692,13 +696,14 @@ func TestSynchronize_RecoverFromCheckpoint(t *testing.T) { nil, ts.mLyrPatrol, ts.mCertHdr, + ts.mAtxSyncer, WithConfig(ts.syncer.cfg), WithLogger(ts.syncer.logger), withDataFetcher(ts.mDataFetcher), withForkFinder(ts.mForkFinder), ) // should not sync any atxs before current epoch - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), current.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), current.GetEpoch()) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) require.True(t, ts.syncer.synchronize(context.Background())) require.Equal(t, current.GetEpoch(), ts.syncer.lastAtxEpoch()) From 12a8e5a269e17755a191007b8aed91fba1888f4a Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 5 Mar 2024 09:23:05 +0100 Subject: [PATCH 02/10] fix error assertion --- p2p/server/server_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index aa4c34e807..6f247b2b49 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -67,7 +67,8 @@ func TestServer(t *testing.T) { }) t.Run("ReceiveError", func(t *testing.T) { _, err := client.Request(ctx, mesh.Hosts()[2].ID(), request) - require.Equal(t, err, testErr) + require.ErrorIs(t, err, ErrPeerResponseFailed) + require.ErrorContains(t, err, testErr.Error()) }) t.Run("DialError", func(t *testing.T) { _, err := client.Request(ctx, mesh.Hosts()[2].ID(), request) From a3d060d2ecd26d8cf4c828ea2f7c4a2df126a65e Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 5 Mar 2024 12:00:12 +0100 Subject: [PATCH 03/10] rename fields to publish/target to avoid confusion --- syncer/atxsync/syncer.go | 45 ++++++++++++++++++----------------- syncer/atxsync/syncer_test.go | 14 +++++------ 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 50c2fd50bb..53239ee054 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -97,24 +97,25 @@ type Syncer struct { localdb *localsql.Database } -func (s *Syncer) closeToTheEpoch(epoch types.EpochID, timestamp time.Time) bool { - epochStart := s.clock.LayerToTime(epoch.FirstLayer()) +func (s *Syncer) closeToTheEpoch(publish types.EpochID, timestamp time.Time) bool { + target := publish + 1 + epochStart := s.clock.LayerToTime(target.FirstLayer()) return timestamp.After(epochStart) || epochStart.Sub(timestamp) < 2*s.cfg.EpochInfoInterval } -func (s *Syncer) Download(ctx context.Context, epoch types.EpochID) error { - s.logger.Info("starting atx sync", log.ZContext(ctx), epoch.Field().Zap()) +func (s *Syncer) Download(ctx context.Context, publish types.EpochID) error { + s.logger.Info("starting atx sync", log.ZContext(ctx), publish.Field().Zap()) - state, err := atxsync.GetSyncState(s.localdb, epoch) + state, err := atxsync.GetSyncState(s.localdb, publish) if err != nil { - return fmt.Errorf("failed to get state for epoch %v: %w", epoch, err) + return fmt.Errorf("failed to get state for epoch %v: %w", publish, err) } - lastSuccess, err := atxsync.GetRequestTime(s.localdb, epoch) + lastSuccess, err := atxsync.GetRequestTime(s.localdb, publish) if err != nil && !errors.Is(err, sql.ErrNotFound) { - return fmt.Errorf("failed to get last request time for epoch %v: %w", epoch, err) + return fmt.Errorf("failed to get last request time for epoch %v: %w", publish, err) } // in case of immediate we will request epoch info without waiting EpochInfoInterval - immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !s.closeToTheEpoch(epoch, lastSuccess)) + immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !s.closeToTheEpoch(publish, lastSuccess)) ctx, cancel := context.WithCancel(ctx) eg, ctx := errgroup.WithContext(ctx) @@ -129,10 +130,10 @@ func (s *Syncer) Download(ctx context.Context, epoch types.EpochID) error { // - all atxs from that epoch have to be downloaded or they are unavailable. // atx is unavailable if it was requested more than RequestsLimit times, and no peer provided it. eg.Go(func() error { - return s.downloadEpochInfo(ctx, epoch, immediate, updates) + return s.downloadEpochInfo(ctx, publish, immediate, updates) }) eg.Go(func() error { - err := s.downloadAtxs(ctx, epoch, state, updates) + err := s.downloadAtxs(ctx, publish, state, updates) cancel() return err }) @@ -141,7 +142,7 @@ func (s *Syncer) Download(ctx context.Context, epoch types.EpochID) error { func (s *Syncer) downloadEpochInfo( ctx context.Context, - epoch types.EpochID, + publish types.EpochID, immediate bool, updates chan epochUpdate, ) error { @@ -164,14 +165,14 @@ func (s *Syncer) downloadEpochInfo( } // do not run it concurrently, epoch info is large and will continue to grow for _, peer := range peers { - epochData, err := s.fetcher.PeerEpochInfo(ctx, peer, epoch) + epochData, err := s.fetcher.PeerEpochInfo(ctx, peer, publish) if err != nil || epochData == nil { if errors.Is(err, context.Canceled) { return nil } s.logger.Warn("failed to download epoch info", log.ZContext(ctx), - epoch.Field().Zap(), + publish.Field().Zap(), zap.String("peer", peer.String()), zap.Error(err), ) @@ -179,7 +180,7 @@ func (s *Syncer) downloadEpochInfo( } s.logger.Info("downloaded epoch info", log.ZContext(ctx), - epoch.Field().Zap(), + publish.Field().Zap(), zap.String("peer", peer.String()), zap.Int("atxs", len(epochData.AtxIDs)), ) @@ -202,7 +203,7 @@ func (s *Syncer) downloadEpochInfo( func (s *Syncer) downloadAtxs( ctx context.Context, - epoch types.EpochID, + publish types.EpochID, state map[types.ATXID]int, updates chan epochUpdate, ) error { @@ -219,11 +220,11 @@ func (s *Syncer) downloadAtxs( for { // waiting for update if there is nothing to download - if nothingToDownload && s.closeToTheEpoch(epoch, lastSuccess) { + if nothingToDownload && s.closeToTheEpoch(publish, lastSuccess) { s.logger.Info( "atx sync terminated", log.ZContext(ctx), - epoch.Field().Zap(), + publish.Field().Zap(), zap.Int("downloaded", len(downloaded)), zap.Int("total", len(state)), zap.Int("unavailable", len(state)-len(downloaded)), @@ -284,7 +285,7 @@ func (s *Syncer) downloadAtxs( s.logger.Info( "atx sync progress", log.ZContext(ctx), - epoch.Field().Zap(), + publish.Field().Zap(), zap.Int("downloaded", len(downloaded)), zap.Int("total", len(state)), zap.Int("progress", int(progress)), @@ -311,12 +312,12 @@ func (s *Syncer) downloadAtxs( } if err := s.localdb.WithTx(ctx, func(tx *sql.Tx) error { - if err := atxsync.SaveRequestTime(tx, epoch, lastSuccess); err != nil { + if err := atxsync.SaveRequestTime(tx, publish, lastSuccess); err != nil { return fmt.Errorf("failed to save request time: %w", err) } - return atxsync.SaveSyncState(tx, epoch, state, s.cfg.RequestsLimit) + return atxsync.SaveSyncState(tx, publish, state, s.cfg.RequestsLimit) }); err != nil { - return fmt.Errorf("failed to persist state for epoch %v: %w", epoch, err) + return fmt.Errorf("failed to persist state for epoch %v: %w", publish, err) } batch = batch[:0] } diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go index 1f3a4a4440..9d0188197f 100644 --- a/syncer/atxsync/syncer_test.go +++ b/syncer/atxsync/syncer_test.go @@ -100,7 +100,7 @@ func TestSyncer(t *testing.T) { }).AnyTimes() past := time.Now().Add(-time.Minute) - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() require.NoError(t, tester.syncer.Download(context.Background(), publish)) }) t.Run("interruptible", func(t *testing.T) { @@ -108,13 +108,13 @@ func TestSyncer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() publish := types.EpochID(1) - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(time.Now()).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() require.NoError(t, tester.syncer.Download(ctx, publish)) }) t.Run("error on no peers", func(t *testing.T) { tester := newTester(t, DefaultConfig()) publish := types.EpochID(1) - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(time.Now()).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(nil) require.ErrorContains(t, tester.syncer.Download(context.Background(), publish), "no peers available") }) @@ -122,7 +122,7 @@ func TestSyncer(t *testing.T) { tester := newTester(t, DefaultConfig()) publish := types.EpochID(2) now := time.Now() - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(now).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(now).AnyTimes() state := map[types.ATXID]int{ aid("1"): 0, @@ -162,7 +162,7 @@ func TestSyncer(t *testing.T) { }).AnyTimes() past := time.Now().Add(-time.Minute) - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() require.NoError(t, tester.syncer.Download(ctx, publish)) }) t.Run("give up on atx after max retries", func(t *testing.T) { @@ -206,7 +206,7 @@ func TestSyncer(t *testing.T) { AnyTimes() past := time.Now().Add(-time.Minute) - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(past).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() require.NoError(t, tester.syncer.Download(context.Background(), publish)) state, err := atxsync.GetSyncState(tester.localdb, publish) @@ -219,7 +219,7 @@ func TestSyncer(t *testing.T) { tester := newTester(t, DefaultConfig()) publish := types.EpochID(2) now := time.Now() - tester.clock.EXPECT().LayerToTime(publish.FirstLayer()).Return(now).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(now).AnyTimes() peers := []p2p.Peer{"a"} tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(edata(), nil) From 6402c8118487ad258e6d7641e9122e804b38d478 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 05:32:43 +0100 Subject: [PATCH 04/10] fix review comments --- syncer/atxsync/syncer.go | 30 ++++++++++++++++++++++-------- syncer/atxsync/syncer_test.go | 2 +- syncer/syncer.go | 4 +++- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 53239ee054..dea1f52959 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -13,6 +13,7 @@ import ( "github.com/spacemeshos/go-spacemesh/fetch" "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/atxs" @@ -47,6 +48,8 @@ func DefaultConfig() Config { AtxsBatch: 1000, RequestsLimit: 20, EpochInfoPeers: 2, + ProgressFraction: 0.1, + ProgressInterval: 20 * time.Minute, } } @@ -65,6 +68,11 @@ type Config struct { // AtxsBatch is the maximum number of atxs to sync in a single request. AtxsBatch int `mapstructure:"atxs-batch"` + + // ProgressFraction will report progress every fraction from total is downloaded. + ProgressFraction float64 `mapstructure:"progress-every-fraction"` + // ProgressInterval will report progress every interval. + ProgressInterval time.Duration `mapstructure:"progress-on-time"` } func WithConfig(cfg Config) Opt { @@ -103,8 +111,8 @@ func (s *Syncer) closeToTheEpoch(publish types.EpochID, timestamp time.Time) boo return timestamp.After(epochStart) || epochStart.Sub(timestamp) < 2*s.cfg.EpochInfoInterval } -func (s *Syncer) Download(ctx context.Context, publish types.EpochID) error { - s.logger.Info("starting atx sync", log.ZContext(ctx), publish.Field().Zap()) +func (s *Syncer) Download(parent context.Context, publish types.EpochID) error { + s.logger.Info("starting atx sync", log.ZContext(parent), publish.Field().Zap()) state, err := atxsync.GetSyncState(s.localdb, publish) if err != nil { @@ -117,7 +125,7 @@ func (s *Syncer) Download(ctx context.Context, publish types.EpochID) error { // in case of immediate we will request epoch info without waiting EpochInfoInterval immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !s.closeToTheEpoch(publish, lastSuccess)) - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(parent) eg, ctx := errgroup.WithContext(ctx) updates := make(chan epochUpdate, s.cfg.EpochInfoPeers) if len(state) == 0 { @@ -137,7 +145,10 @@ func (s *Syncer) Download(ctx context.Context, publish types.EpochID) error { cancel() return err }) - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + return parent.Err() } func (s *Syncer) downloadEpochInfo( @@ -222,7 +233,7 @@ func (s *Syncer) downloadAtxs( // waiting for update if there is nothing to download if nothingToDownload && s.closeToTheEpoch(publish, lastSuccess) { s.logger.Info( - "atx sync terminated", + "atx sync completed", log.ZContext(ctx), publish.Field().Zap(), zap.Int("downloaded", len(downloaded)), @@ -279,9 +290,10 @@ func (s *Syncer) downloadAtxs( } nothingToDownload = len(batch) == 0 - // report progress every 10% or every 20 minutes - if progress := float64(len(downloaded) - previouslyDownloaded); progress/float64(len(state)) > 0.1 || - time.Since(progressTimestamp) > 20*time.Minute { + if progress := float64(len(downloaded) - previouslyDownloaded); progress/float64( + len(state), + ) > s.cfg.ProgressFraction && s.cfg.ProgressFraction != 0 || + time.Since(progressTimestamp) > s.cfg.ProgressInterval && s.cfg.ProgressInterval != 0 { s.logger.Info( "atx sync progress", log.ZContext(ctx), @@ -305,6 +317,8 @@ func (s *Syncer) downloadAtxs( for hash, err := range batchError.Errors { if errors.Is(err, server.ErrPeerResponseFailed) { state[types.ATXID(hash)]++ + } else if errors.Is(err, pubsub.ErrValidationReject) { + state[types.ATXID(hash)] = s.cfg.RequestsLimit } } } diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go index 9d0188197f..9508a062c6 100644 --- a/syncer/atxsync/syncer_test.go +++ b/syncer/atxsync/syncer_test.go @@ -109,7 +109,7 @@ func TestSyncer(t *testing.T) { cancel() publish := types.EpochID(1) tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() - require.NoError(t, tester.syncer.Download(ctx, publish)) + require.ErrorIs(t, tester.syncer.Download(ctx, publish), context.Canceled) }) t.Run("error on no peers", func(t *testing.T) { tester := newTester(t, DefaultConfig()) diff --git a/syncer/syncer.go b/syncer/syncer.go index e11265c719..32850695d7 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -383,7 +383,9 @@ func (s *Syncer) synchronize(ctx context.Context) bool { } if err := s.syncAtx(ctx); err != nil { - s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err)) + if !errors.Is(err, context.Canceled) { + s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err)) + } return false } From 875239172171393c1266b7712dc2becd3de4fa3c Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 05:46:47 +0100 Subject: [PATCH 05/10] clear atxsync --- checkpoint/recovery.go | 7 +++++++ sql/atxsync/atxsync.go | 12 ++++++++++++ sql/atxsync/atxsync_test.go | 22 ++++++++++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/checkpoint/recovery.go b/checkpoint/recovery.go index 63d2bcb292..16823b1fd4 100644 --- a/checkpoint/recovery.go +++ b/checkpoint/recovery.go @@ -20,6 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/accounts" "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" "github.com/spacemeshos/go-spacemesh/sql/localsql" "github.com/spacemeshos/go-spacemesh/sql/localsql/nipost" "github.com/spacemeshos/go-spacemesh/sql/poets" @@ -115,6 +116,12 @@ func Recover( return nil, fmt.Errorf("open old local database: %w", err) } defer localDB.Close() + logger.With().Info("clearing atx sync metadata from local database") + if err := localDB.WithTx(ctx, func(tx *sql.Tx) error { + return atxsync.Clear(tx) + }); err != nil { + return nil, fmt.Errorf("clear atxsync: %w", err) + } preserve, err := RecoverWithDb(ctx, logger, db, localDB, fs, cfg) switch { case errors.Is(err, ErrCheckpointNotFound): diff --git a/sql/atxsync/atxsync.go b/sql/atxsync/atxsync.go index baaa08b511..5518b8a9c0 100644 --- a/sql/atxsync/atxsync.go +++ b/sql/atxsync/atxsync.go @@ -80,3 +80,15 @@ func GetRequestTime(db sql.Executor, epoch types.EpochID) (time.Time, error) { } return timestamp, nil } + +func Clear(db sql.Executor) error { + _, err := db.Exec(`delete from atx_sync_state`, nil, nil) + if err != nil { + return fmt.Errorf("clear atx sync state failed: %w", err) + } + _, err = db.Exec(`delete from atx_sync_requests`, nil, nil) + if err != nil { + return fmt.Errorf("clear atx sync requests failed: %w", err) + } + return nil +} diff --git a/sql/atxsync/atxsync_test.go b/sql/atxsync/atxsync_test.go index dbb5183ecd..24b8695129 100644 --- a/sql/atxsync/atxsync_test.go +++ b/sql/atxsync/atxsync_test.go @@ -80,3 +80,25 @@ func TestRequestTime(t *testing.T) { } } } + +func TestClear(t *testing.T) { + db := localsql.InMemory() + // add some state + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + states := map[types.ATXID]int{} + const size = 100 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + states[id] = 0 + } + require.NoError(t, SaveSyncState(db, epoch, states, 1)) + require.NoError(t, SaveRequestTime(db, epoch, time.Now())) + } + require.NoError(t, Clear(db)) + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + state, err := GetSyncState(db, epoch) + require.NoError(t, err) + require.Empty(t, state) + } +} From 9e7a05992b987ed8fee4c5e0b80994bddf8b7750 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 06:01:51 +0100 Subject: [PATCH 06/10] more review comments --- syncer/atxsync/syncer.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index dea1f52959..88356f08c5 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -155,13 +155,20 @@ func (s *Syncer) downloadEpochInfo( ctx context.Context, publish types.EpochID, immediate bool, - updates chan epochUpdate, + updates chan<- epochUpdate, ) error { interval := s.cfg.EpochInfoInterval if immediate { interval = 0 } for { + if interval != 0 { + s.logger.Debug( + "waiting between epoch info requests", + publish.Field().Zap(), + zap.Duration("duration", interval), + ) + } select { case <-ctx.Done(): return nil @@ -216,11 +223,10 @@ func (s *Syncer) downloadAtxs( ctx context.Context, publish types.EpochID, state map[types.ATXID]int, - updates chan epochUpdate, + updates <-chan epochUpdate, ) error { - batch := make([]types.ATXID, 0, s.cfg.AtxsBatch) - batch = batch[:0] var ( + batch = make([]types.ATXID, 0, s.cfg.AtxsBatch) downloaded = map[types.ATXID]bool{} previouslyDownloaded = 0 start = time.Now() From d9658d2a7715e09e0ffcf057bea10cb5a31d339b Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 08:20:26 +0100 Subject: [PATCH 07/10] fix config --- config/mainnet.go | 2 ++ config/presets/testnet.go | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/config/mainnet.go b/config/mainnet.go index 90e85e1ca2..7879016cf9 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -23,6 +23,7 @@ import ( "github.com/spacemeshos/go-spacemesh/hare3/eligibility" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config" "github.com/spacemeshos/go-spacemesh/tortoise" ) @@ -186,6 +187,7 @@ func MainnetConfig() Config { OutOfSyncThresholdLayers: 36, // 3h DisableMeshAgreement: true, DisableAtxReconciliation: true, + AtxSync: atxsync.DefaultConfig(), }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/config/presets/testnet.go b/config/presets/testnet.go index b6f4fbba4a..f450f3678b 100644 --- a/config/presets/testnet.go +++ b/config/presets/testnet.go @@ -24,6 +24,7 @@ import ( "github.com/spacemeshos/go-spacemesh/hare3/eligibility" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config" "github.com/spacemeshos/go-spacemesh/tortoise" ) @@ -137,10 +138,11 @@ func testnet() config.Config { LOGGING: config.DefaultLoggingConfig(), Sync: syncer.Config{ Interval: time.Minute, - EpochEndFraction: 0.8, + EpochEndFraction: 0.5, MaxStaleDuration: time.Hour, GossipDuration: 50 * time.Second, OutOfSyncThresholdLayers: 10, + AtxSync: atxsync.DefaultConfig(), }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), From 3a8f1b837d7d397fb2b42b9323e6231d7981aeb7 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 13:31:23 +0100 Subject: [PATCH 08/10] interruptible could make calls depending on concurrency --- syncer/atxsync/syncer_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go index 9508a062c6..5f1621bdce 100644 --- a/syncer/atxsync/syncer_test.go +++ b/syncer/atxsync/syncer_test.go @@ -108,7 +108,11 @@ func TestSyncer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() publish := types.EpochID(1) - tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() + now := time.Now() + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return([]p2p.Peer{"a"}).AnyTimes() + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), gomock.Any(), publish).Return(edata("1"), nil).AnyTimes() + tester.fetcher.EXPECT().GetAtxs(gomock.Any(), gomock.Any()).Return(errors.New("no atxs")).AnyTimes() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(now).AnyTimes() require.ErrorIs(t, tester.syncer.Download(ctx, publish), context.Canceled) }) t.Run("error on no peers", func(t *testing.T) { From 712fde6efa58b4e802a04c3f4ba97d4f30a5abbc Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 14:12:08 +0100 Subject: [PATCH 09/10] use correct error and remove marhshal log object as it doesn't work with errors --- fetch/fetch.go | 6 +++--- fetch/mesh_data.go | 15 --------------- p2p/server/server.go | 10 +++------- p2p/server/server_test.go | 2 +- syncer/atxsync/syncer.go | 3 +-- syncer/atxsync/syncer_test.go | 3 +-- 6 files changed, 9 insertions(+), 30 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index e79f0d7077..4f356bf1df 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -36,9 +36,9 @@ const ( ) var ( - // errExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch + // ErrExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch // data for a hash and failed. - errExceedMaxRetries = errors.New("fetch failed after max retries for request") + ErrExceedMaxRetries = errors.New("fetch failed after max retries for request") errValidatorsNotSet = errors.New("validators not set") ) @@ -534,7 +534,7 @@ func (f *Fetch) failAfterRetry(hash types.Hash32) { log.Stringer("hash", req.hash), log.Int("retries", req.retries), ) - req.promise.err = errExceedMaxRetries + req.promise.err = ErrExceedMaxRetries close(req.promise.completed) } else { // put the request back to the unprocessed list diff --git a/fetch/mesh_data.go b/fetch/mesh_data.go index 0e82723b85..46245b3c22 100644 --- a/fetch/mesh_data.go +++ b/fetch/mesh_data.go @@ -7,7 +7,6 @@ import ( "strings" "sync" - "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/activation" @@ -347,17 +346,3 @@ func (b *BatchError) Error() string { } return builder.String() } - -func (b *BatchError) MarshalLogObject(encoder log.ObjectEncoder) error { - encoder.AddArray("errors", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { - for hash, err := range b.Errors { - encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(encoder log.ObjectEncoder) error { - encoder.AddString("id", hash.ShortString()) - encoder.AddString("error", err.Error()) - return nil - })) - } - return nil - })) - return nil -} diff --git a/p2p/server/server.go b/p2p/server/server.go index be90fb6553..6999e20d0b 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -28,12 +28,8 @@ type DecayingTagSpec struct { Cap int `mapstructure:"cap"` } -var ( - // ErrNotConnected is returned when peer is not connected. - ErrNotConnected = errors.New("peer is not connected") - // ErrPeerResponseFailed raised if peer responded with an error. - ErrPeerResponseFailed = errors.New("peer response failed") -) +// ErrNotConnected is returned when peer is not connected. +var ErrNotConnected = errors.New("peer is not connected") // Opt is a type to configure a server. type Opt func(s *Server) @@ -347,7 +343,7 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte) ([]byte, s.metrics.clientServerError.Inc() s.metrics.clientLatency.Observe(took) } - return nil, fmt.Errorf("%w: %s", ErrPeerResponseFailed, data.Error) + return nil, fmt.Errorf("peer error: %s", data.Error) case s.metrics != nil: s.metrics.clientSucceeded.Inc() s.metrics.clientLatency.Observe(took) diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index 6f247b2b49..dd6439d84f 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -67,7 +67,7 @@ func TestServer(t *testing.T) { }) t.Run("ReceiveError", func(t *testing.T) { _, err := client.Request(ctx, mesh.Hosts()[2].ID(), request) - require.ErrorIs(t, err, ErrPeerResponseFailed) + require.ErrorContains(t, err, "peer error") require.ErrorContains(t, err, testErr.Error()) }) t.Run("DialError", func(t *testing.T) { diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 88356f08c5..0344be3213 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -14,7 +14,6 @@ import ( "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" - "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/atxs" "github.com/spacemeshos/go-spacemesh/sql/atxsync" @@ -321,7 +320,7 @@ func (s *Syncer) downloadAtxs( batchError := &fetch.BatchError{} if errors.As(err, &batchError) { for hash, err := range batchError.Errors { - if errors.Is(err, server.ErrPeerResponseFailed) { + if errors.Is(err, fetch.ErrExceedMaxRetries) { state[types.ATXID(hash)]++ } else if errors.Is(err, pubsub.ErrValidationReject) { state[types.ATXID(hash)] = s.cfg.RequestsLimit diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go index 5f1621bdce..05fff925bf 100644 --- a/syncer/atxsync/syncer_test.go +++ b/syncer/atxsync/syncer_test.go @@ -14,7 +14,6 @@ import ( "github.com/spacemeshos/go-spacemesh/fetch" "github.com/spacemeshos/go-spacemesh/log/logtest" "github.com/spacemeshos/go-spacemesh/p2p" - "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/atxs" "github.com/spacemeshos/go-spacemesh/sql/atxsync" @@ -198,7 +197,7 @@ func TestSyncer(t *testing.T) { } for _, bad := range bad.AtxIDs { if bad == id { - berr.Add(bad.Hash32(), fmt.Errorf("%w: test", server.ErrPeerResponseFailed)) + berr.Add(bad.Hash32(), fmt.Errorf("%w: test", fetch.ErrExceedMaxRetries)) } } } From 11d1a45f30f45b0ec5bff532d53c96e40499ca36 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Thu, 7 Mar 2024 19:25:40 +0100 Subject: [PATCH 10/10] protect from accidental error --- syncer/atxsync/syncer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go index 0344be3213..05365b641f 100644 --- a/syncer/atxsync/syncer.go +++ b/syncer/atxsync/syncer.go @@ -320,6 +320,9 @@ func (s *Syncer) downloadAtxs( batchError := &fetch.BatchError{} if errors.As(err, &batchError) { for hash, err := range batchError.Errors { + if _, exists := state[types.ATXID(hash)]; !exists { + continue + } if errors.Is(err, fetch.ErrExceedMaxRetries) { state[types.ATXID(hash)]++ } else if errors.Is(err, pubsub.ErrValidationReject) {