diff --git a/checkpoint/recovery.go b/checkpoint/recovery.go index 9a4af619484..fcb2f7f6b89 100644 --- a/checkpoint/recovery.go +++ b/checkpoint/recovery.go @@ -20,6 +20,7 @@ import ( "github.com/spacemeshos/go-spacemesh/sql" "github.com/spacemeshos/go-spacemesh/sql/accounts" "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" "github.com/spacemeshos/go-spacemesh/sql/localsql" "github.com/spacemeshos/go-spacemesh/sql/localsql/nipost" "github.com/spacemeshos/go-spacemesh/sql/poets" @@ -115,6 +116,12 @@ func Recover( return nil, fmt.Errorf("open old local database: %w", err) } defer localDB.Close() + logger.With().Info("clearing atx sync metadata from local database") + if err := localDB.WithTx(ctx, func(tx *sql.Tx) error { + return atxsync.Clear(tx) + }); err != nil { + return nil, fmt.Errorf("clear atxsync: %w", err) + } preserve, err := RecoverWithDb(ctx, logger, db, localDB, fs, cfg) switch { case errors.Is(err, ErrCheckpointNotFound): diff --git a/config/mainnet.go b/config/mainnet.go index 90e85e1ca2c..7879016cf9f 100644 --- a/config/mainnet.go +++ b/config/mainnet.go @@ -23,6 +23,7 @@ import ( "github.com/spacemeshos/go-spacemesh/hare3/eligibility" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config" "github.com/spacemeshos/go-spacemesh/tortoise" ) @@ -186,6 +187,7 @@ func MainnetConfig() Config { OutOfSyncThresholdLayers: 36, // 3h DisableMeshAgreement: true, DisableAtxReconciliation: true, + AtxSync: atxsync.DefaultConfig(), }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/config/presets/fastnet.go b/config/presets/fastnet.go index 33655137450..9ef40d844c7 100644 --- a/config/presets/fastnet.go +++ b/config/presets/fastnet.go @@ -51,6 +51,7 @@ func fastnet() config.Config { conf.LayerDuration = 15 * time.Second conf.Sync.Interval = 5 * time.Second conf.Sync.GossipDuration = 10 * time.Second + conf.Sync.AtxSync.EpochInfoInterval = 20 * time.Second conf.LayersPerEpoch = 4 conf.RegossipAtxInterval = 30 * time.Second diff --git a/config/presets/testnet.go b/config/presets/testnet.go index b6f4fbba4a1..f450f3678b6 100644 --- a/config/presets/testnet.go +++ b/config/presets/testnet.go @@ -24,6 +24,7 @@ import ( "github.com/spacemeshos/go-spacemesh/hare3/eligibility" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/syncer" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" timeConfig "github.com/spacemeshos/go-spacemesh/timesync/config" "github.com/spacemeshos/go-spacemesh/tortoise" ) @@ -137,10 +138,11 @@ func testnet() config.Config { LOGGING: config.DefaultLoggingConfig(), Sync: syncer.Config{ Interval: time.Minute, - EpochEndFraction: 0.8, + EpochEndFraction: 0.5, MaxStaleDuration: time.Hour, GossipDuration: 50 * time.Second, OutOfSyncThresholdLayers: 10, + AtxSync: atxsync.DefaultConfig(), }, Recovery: checkpoint.DefaultConfig(), Cache: datastore.DefaultConfig(), diff --git a/fetch/mesh_data.go b/fetch/mesh_data.go index 77c467fa207..0e82723b851 100644 --- a/fetch/mesh_data.go +++ b/fetch/mesh_data.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" - "sync/atomic" + "strings" + "sync" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/activation" @@ -66,8 +68,11 @@ func (f *Fetch) getHashes( pendingMetric := pendingHashReqs.WithLabelValues(string(hint)) pendingMetric.Add(float64(len(hashes))) - var eg errgroup.Group - var failed atomic.Uint64 + var ( + eg errgroup.Group + mu sync.Mutex + bfailure = BatchError{Errors: map[types.Hash32]error{}} + ) for i, hash := range hashes { if err := options.limiter.Acquire(ctx, 1); err != nil { pendingMetric.Add(float64(i - len(hashes))) @@ -97,12 +102,15 @@ func (f *Fetch) getHashes( options.limiter.Release(1) pendingMetric.Add(-1) if p.err != nil { - f.logger.Debug("failed to get hash", + f.logger.With().Debug("failed to get hash", log.String("hint", string(hint)), log.Stringer("hash", h), log.Err(p.err), ) - failed.Add(1) + + mu.Lock() + bfailure.Add(h, p.err) + mu.Unlock() } return nil } @@ -110,8 +118,8 @@ func (f *Fetch) getHashes( } eg.Wait() - if failed.Load() > 0 { - return fmt.Errorf("failed to fetch %d hashes out of %d", failed.Load(), len(hashes)) + if !bfailure.Empty() { + return &bfailure } return nil } @@ -313,3 +321,43 @@ func (f *Fetch) GetCert( } return nil, fmt.Errorf("failed to get cert %v/%s from %d peers: %w", lid, bid.String(), len(peers), ctx.Err()) } + +type BatchError struct { + Errors map[types.Hash32]error +} + +func (b *BatchError) Empty() bool { + return len(b.Errors) == 0 +} + +func (b *BatchError) Add(id types.Hash32, err error) { + if b.Errors == nil { + b.Errors = map[types.Hash32]error{} + } + b.Errors[id] = err +} + +func (b *BatchError) Error() string { + var builder strings.Builder + builder.WriteString("batch failure: ") + for hash, err := range b.Errors { + builder.WriteString(hash.ShortString()) + builder.WriteString("=") + builder.WriteString(err.Error()) + } + return builder.String() +} + +func (b *BatchError) MarshalLogObject(encoder log.ObjectEncoder) error { + encoder.AddArray("errors", log.ArrayMarshalerFunc(func(encoder log.ArrayEncoder) error { + for hash, err := range b.Errors { + encoder.AppendObject(zapcore.ObjectMarshalerFunc(func(encoder log.ObjectEncoder) error { + encoder.AddString("id", hash.ShortString()) + encoder.AddString("error", err.Error()) + return nil + })) + } + return nil + })) + return nil +} diff --git a/log/zap.go b/log/zap.go index 9d5fd7d008b..c0a67419ea4 100644 --- a/log/zap.go +++ b/log/zap.go @@ -198,6 +198,14 @@ func ZContext(ctx context.Context) zap.Field { return zap.Inline(&marshalledContext{Context: ctx}) } +func NiceZapError(err error) zap.Field { + var loggable ObjectMarshaller + if errors.As(err, &loggable) { + return zap.Inline(loggable) + } + return zap.Error(err) +} + func Any(key string, value any) Field { return Field(zap.Any(key, value)) } diff --git a/node/node.go b/node/node.go index 3863f0dffea..a53e1104159 100644 --- a/node/node.go +++ b/node/node.go @@ -861,6 +861,10 @@ func (app *App) initServices(ctx context.Context) error { fetcher, patrol, app.certifier, + atxsync.New(fetcher, app.clock, app.db, app.localDB, + atxsync.WithConfig(app.Config.Sync.AtxSync), + atxsync.WithLogger(app.syncLogger.Zap()), + ), syncer.WithConfig(syncerConf), syncer.WithLogger(app.syncLogger), ) diff --git a/p2p/server/server.go b/p2p/server/server.go index 84cf1779a55..be90fb65534 100644 --- a/p2p/server/server.go +++ b/p2p/server/server.go @@ -28,8 +28,12 @@ type DecayingTagSpec struct { Cap int `mapstructure:"cap"` } -// ErrNotConnected is returned when peer is not connected. -var ErrNotConnected = errors.New("peer is not connected") +var ( + // ErrNotConnected is returned when peer is not connected. + ErrNotConnected = errors.New("peer is not connected") + // ErrPeerResponseFailed raised if peer responded with an error. + ErrPeerResponseFailed = errors.New("peer response failed") +) // Opt is a type to configure a server. type Opt func(s *Server) @@ -343,7 +347,7 @@ func (s *Server) Request(ctx context.Context, pid peer.ID, req []byte) ([]byte, s.metrics.clientServerError.Inc() s.metrics.clientLatency.Observe(took) } - return nil, errors.New(data.Error) + return nil, fmt.Errorf("%w: %s", ErrPeerResponseFailed, data.Error) case s.metrics != nil: s.metrics.clientSucceeded.Inc() s.metrics.clientLatency.Observe(took) diff --git a/p2p/server/server_test.go b/p2p/server/server_test.go index aa4c34e8077..6f247b2b49f 100644 --- a/p2p/server/server_test.go +++ b/p2p/server/server_test.go @@ -67,7 +67,8 @@ func TestServer(t *testing.T) { }) t.Run("ReceiveError", func(t *testing.T) { _, err := client.Request(ctx, mesh.Hosts()[2].ID(), request) - require.Equal(t, err, testErr) + require.ErrorIs(t, err, ErrPeerResponseFailed) + require.ErrorContains(t, err, testErr.Error()) }) t.Run("DialError", func(t *testing.T) { _, err := client.Request(ctx, mesh.Hosts()[2].ID(), request) diff --git a/sql/atxsync/atxsync.go b/sql/atxsync/atxsync.go new file mode 100644 index 00000000000..5518b8a9c00 --- /dev/null +++ b/sql/atxsync/atxsync.go @@ -0,0 +1,94 @@ +package atxsync + +import ( + "fmt" + "time" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" +) + +func GetSyncState(db sql.Executor, epoch types.EpochID) (map[types.ATXID]int, error) { + states := map[types.ATXID]int{} + _, err := db.Exec("select id, requests from atx_sync_state where epoch = ?1", + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + }, func(stmt *sql.Statement) bool { + var id types.ATXID + stmt.ColumnBytes(0, id[:]) + states[id] = int(stmt.ColumnInt64(1)) + return true + }) + if err != nil { + return nil, fmt.Errorf("select synced atx ids for epoch failed %v: %w", epoch, err) + } + return states, nil +} + +func SaveSyncState(db sql.Executor, epoch types.EpochID, states map[types.ATXID]int, max int) error { + for id, requests := range states { + var err error + if requests == max { + _, err = db.Exec(`delete from atx_sync_state where epoch = ?1 and id = ?2`, func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindBytes(2, id[:]) + }, nil) + } else { + _, err = db.Exec(`insert into atx_sync_state + (epoch, id, requests) values (?1, ?2, ?3) + on conflict(epoch, id) do update set requests = ?3;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindBytes(2, id[:]) + stmt.BindInt64(3, int64(requests)) + }, nil) + } + if err != nil { + return fmt.Errorf("insert synced atx id %v/%v failed: %w", epoch, id.ShortString(), err) + } + } + return nil +} + +func SaveRequestTime(db sql.Executor, epoch types.EpochID, timestamp time.Time) error { + _, err := db.Exec(`insert into atx_sync_requests + (epoch, timestamp) values (?1, ?2) + on conflict(epoch) do update set timestamp = ?2;`, + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + stmt.BindInt64(2, timestamp.Unix()) + }, nil) + if err != nil { + return fmt.Errorf("insert request time for epoch %v failed: %w", epoch, err) + } + return nil +} + +func GetRequestTime(db sql.Executor, epoch types.EpochID) (time.Time, error) { + var timestamp time.Time + rows, err := db.Exec("select timestamp from atx_sync_requests where epoch = ?1", + func(stmt *sql.Statement) { + stmt.BindInt64(1, int64(epoch)) + }, func(stmt *sql.Statement) bool { + timestamp = time.Unix(stmt.ColumnInt64(0), 0) + return true + }) + if err != nil { + return time.Time{}, fmt.Errorf("select request time for epoch %v failed: %w", epoch, err) + } else if rows == 0 { + return time.Time{}, fmt.Errorf("%w: no request time for epoch %v", sql.ErrNotFound, epoch) + } + return timestamp, nil +} + +func Clear(db sql.Executor) error { + _, err := db.Exec(`delete from atx_sync_state`, nil, nil) + if err != nil { + return fmt.Errorf("clear atx sync state failed: %w", err) + } + _, err = db.Exec(`delete from atx_sync_requests`, nil, nil) + if err != nil { + return fmt.Errorf("clear atx sync requests failed: %w", err) + } + return nil +} diff --git a/sql/atxsync/atxsync_test.go b/sql/atxsync/atxsync_test.go new file mode 100644 index 00000000000..24b86951296 --- /dev/null +++ b/sql/atxsync/atxsync_test.go @@ -0,0 +1,104 @@ +package atxsync + +import ( + "encoding/binary" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/localsql" +) + +func TestSyncState(t *testing.T) { + db := localsql.InMemory() + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + state, err := GetSyncState(db, epoch) + require.NoError(t, err) + require.Empty(t, state) + + states := map[types.ATXID]int{} + const size = 100 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + states[id] = 0 + } + require.NoError(t, SaveSyncState(db, epoch, states, 1)) + + state, err = GetSyncState(db, epoch) + require.NoError(t, err) + + require.Len(t, state, size) + const max = 10 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + requests, exists := state[id] + require.True(t, exists) + require.Equal(t, 0, requests) + if i < size/2 { + state[id] = 1 + } else { + state[id] = max + } + } + + require.NoError(t, SaveSyncState(db, epoch, state, max)) + + updated, err := GetSyncState(db, epoch) + require.NoError(t, err) + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + if i >= size/2 { + delete(state, id) + } + } + require.Equal(t, state, updated) + + } +} + +func TestRequestTime(t *testing.T) { + db := localsql.InMemory() + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + timestamp, err := GetRequestTime(db, epoch) + require.ErrorIs(t, err, sql.ErrNotFound) + require.True(t, timestamp.IsZero()) + + for step := time.Duration(0); step < 10*time.Second; step += time.Second { + now := time.Now().Add(step) + require.NoError(t, SaveRequestTime(db, epoch, now)) + + timestamp, err = GetRequestTime(db, epoch) + require.NoError(t, err) + // now is truncated to a multiple of seconds, as we discard nanonesconds when saving request time + require.Equal(t, now.Truncate(time.Second), timestamp) + } + } +} + +func TestClear(t *testing.T) { + db := localsql.InMemory() + // add some state + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + states := map[types.ATXID]int{} + const size = 100 + for i := 0; i < size; i++ { + id := types.ATXID{} + binary.BigEndian.PutUint64(id[:], uint64(i)) + states[id] = 0 + } + require.NoError(t, SaveSyncState(db, epoch, states, 1)) + require.NoError(t, SaveRequestTime(db, epoch, time.Now())) + } + require.NoError(t, Clear(db)) + for epoch := types.EpochID(1); epoch < types.EpochID(5); epoch++ { + state, err := GetSyncState(db, epoch) + require.NoError(t, err) + require.Empty(t, state) + } +} diff --git a/sql/migrations/local/0004_atx_sync.sql b/sql/migrations/local/0004_atx_sync.sql new file mode 100644 index 00000000000..dfe8afcbed9 --- /dev/null +++ b/sql/migrations/local/0004_atx_sync.sql @@ -0,0 +1,14 @@ +CREATE TABLE atx_sync_state +( + epoch INT NOT NULL, + id CHAR(32) NOT NULL, + requests INT NOT NULL DEFAULT 0, + PRIMARY KEY (epoch, id) +) WITHOUT ROWID; + +CREATE TABLE atx_sync_requests +( + epoch INT NOT NULL, + timestamp INT NOT NULL, + PRIMARY KEY (epoch) +) WITHOUT ROWID; \ No newline at end of file diff --git a/syncer/atxsync/mocks/mocks.go b/syncer/atxsync/mocks/mocks.go new file mode 100644 index 00000000000..0f24359089b --- /dev/null +++ b/syncer/atxsync/mocks/mocks.go @@ -0,0 +1,226 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./syncer.go +// +// Generated by this command: +// +// mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./syncer.go +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + time "time" + + types "github.com/spacemeshos/go-spacemesh/common/types" + fetch "github.com/spacemeshos/go-spacemesh/fetch" + p2p "github.com/spacemeshos/go-spacemesh/p2p" + system "github.com/spacemeshos/go-spacemesh/system" + gomock "go.uber.org/mock/gomock" +) + +// Mockclock is a mock of clock interface. +type Mockclock struct { + ctrl *gomock.Controller + recorder *MockclockMockRecorder +} + +// MockclockMockRecorder is the mock recorder for Mockclock. +type MockclockMockRecorder struct { + mock *Mockclock +} + +// NewMockclock creates a new mock instance. +func NewMockclock(ctrl *gomock.Controller) *Mockclock { + mock := &Mockclock{ctrl: ctrl} + mock.recorder = &MockclockMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mockclock) EXPECT() *MockclockMockRecorder { + return m.recorder +} + +// LayerToTime mocks base method. +func (m *Mockclock) LayerToTime(arg0 types.LayerID) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LayerToTime", arg0) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// LayerToTime indicates an expected call of LayerToTime. +func (mr *MockclockMockRecorder) LayerToTime(arg0 any) *MockclockLayerToTimeCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LayerToTime", reflect.TypeOf((*Mockclock)(nil).LayerToTime), arg0) + return &MockclockLayerToTimeCall{Call: call} +} + +// MockclockLayerToTimeCall wrap *gomock.Call +type MockclockLayerToTimeCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockclockLayerToTimeCall) Return(arg0 time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockclockLayerToTimeCall) Do(f func(types.LayerID) time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockclockLayerToTimeCall) DoAndReturn(f func(types.LayerID) time.Time) *MockclockLayerToTimeCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// Mockfetcher is a mock of fetcher interface. +type Mockfetcher struct { + ctrl *gomock.Controller + recorder *MockfetcherMockRecorder +} + +// MockfetcherMockRecorder is the mock recorder for Mockfetcher. +type MockfetcherMockRecorder struct { + mock *Mockfetcher +} + +// NewMockfetcher creates a new mock instance. +func NewMockfetcher(ctrl *gomock.Controller) *Mockfetcher { + mock := &Mockfetcher{ctrl: ctrl} + mock.recorder = &MockfetcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *Mockfetcher) EXPECT() *MockfetcherMockRecorder { + return m.recorder +} + +// GetAtxs mocks base method. +func (m *Mockfetcher) GetAtxs(arg0 context.Context, arg1 []types.ATXID, arg2 ...system.GetAtxOpt) error { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetAtxs", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// GetAtxs indicates an expected call of GetAtxs. +func (mr *MockfetcherMockRecorder) GetAtxs(arg0, arg1 any, arg2 ...any) *MockfetcherGetAtxsCall { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAtxs", reflect.TypeOf((*Mockfetcher)(nil).GetAtxs), varargs...) + return &MockfetcherGetAtxsCall{Call: call} +} + +// MockfetcherGetAtxsCall wrap *gomock.Call +type MockfetcherGetAtxsCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherGetAtxsCall) Return(arg0 error) *MockfetcherGetAtxsCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherGetAtxsCall) Do(f func(context.Context, []types.ATXID, ...system.GetAtxOpt) error) *MockfetcherGetAtxsCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherGetAtxsCall) DoAndReturn(f func(context.Context, []types.ATXID, ...system.GetAtxOpt) error) *MockfetcherGetAtxsCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// PeerEpochInfo mocks base method. +func (m *Mockfetcher) PeerEpochInfo(arg0 context.Context, arg1 p2p.Peer, arg2 types.EpochID) (*fetch.EpochData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeerEpochInfo", arg0, arg1, arg2) + ret0, _ := ret[0].(*fetch.EpochData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PeerEpochInfo indicates an expected call of PeerEpochInfo. +func (mr *MockfetcherMockRecorder) PeerEpochInfo(arg0, arg1, arg2 any) *MockfetcherPeerEpochInfoCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerEpochInfo", reflect.TypeOf((*Mockfetcher)(nil).PeerEpochInfo), arg0, arg1, arg2) + return &MockfetcherPeerEpochInfoCall{Call: call} +} + +// MockfetcherPeerEpochInfoCall wrap *gomock.Call +type MockfetcherPeerEpochInfoCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherPeerEpochInfoCall) Return(arg0 *fetch.EpochData, arg1 error) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.Return(arg0, arg1) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherPeerEpochInfoCall) Do(f func(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherPeerEpochInfoCall) DoAndReturn(f func(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error)) *MockfetcherPeerEpochInfoCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + +// SelectBestShuffled mocks base method. +func (m *Mockfetcher) SelectBestShuffled(arg0 int) []p2p.Peer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SelectBestShuffled", arg0) + ret0, _ := ret[0].([]p2p.Peer) + return ret0 +} + +// SelectBestShuffled indicates an expected call of SelectBestShuffled. +func (mr *MockfetcherMockRecorder) SelectBestShuffled(arg0 any) *MockfetcherSelectBestShuffledCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectBestShuffled", reflect.TypeOf((*Mockfetcher)(nil).SelectBestShuffled), arg0) + return &MockfetcherSelectBestShuffledCall{Call: call} +} + +// MockfetcherSelectBestShuffledCall wrap *gomock.Call +type MockfetcherSelectBestShuffledCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockfetcherSelectBestShuffledCall) Return(arg0 []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockfetcherSelectBestShuffledCall) Do(f func(int) []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockfetcherSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Peer) *MockfetcherSelectBestShuffledCall { + c.Call = c.Call.DoAndReturn(f) + return c +} diff --git a/syncer/atxsync/syncer.go b/syncer/atxsync/syncer.go new file mode 100644 index 00000000000..88356f08c5f --- /dev/null +++ b/syncer/atxsync/syncer.go @@ -0,0 +1,349 @@ +package atxsync + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/log" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/pubsub" + "github.com/spacemeshos/go-spacemesh/p2p/server" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" + "github.com/spacemeshos/go-spacemesh/sql/localsql" + "github.com/spacemeshos/go-spacemesh/system" +) + +//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./syncer.go + +type clock interface { + LayerToTime(types.LayerID) time.Time +} + +type fetcher interface { + SelectBestShuffled(int) []p2p.Peer + PeerEpochInfo(context.Context, p2p.Peer, types.EpochID) (*fetch.EpochData, error) + system.AtxFetcher +} + +type Opt func(*Syncer) + +func WithLogger(logger *zap.Logger) Opt { + return func(s *Syncer) { + s.logger = logger + } +} + +func DefaultConfig() Config { + return Config{ + EpochInfoInterval: 30 * time.Minute, + AtxsBatch: 1000, + RequestsLimit: 20, + EpochInfoPeers: 2, + ProgressFraction: 0.1, + ProgressInterval: 20 * time.Minute, + } +} + +type Config struct { + // EpochInfoInterval between epoch info requests to the network. + EpochInfoInterval time.Duration `mapstructure:"epoch-info-request-interval"` + // EpochInfoPeers is the number of peers we will ask for epoch info, every epoch info requests interval. + EpochInfoPeers int `mapstructure:"epoch-info-peers"` + + // RequestsLimit is the maximum number of requests for single activation. + // + // The purpose of it is to prevent peers from advertising invalid atx and disappearing. + // Which will make node ask other peers for invalid atx. + // It will be reset to 0 once atx advertised again. + RequestsLimit int `mapstructure:"requests-limit"` + + // AtxsBatch is the maximum number of atxs to sync in a single request. + AtxsBatch int `mapstructure:"atxs-batch"` + + // ProgressFraction will report progress every fraction from total is downloaded. + ProgressFraction float64 `mapstructure:"progress-every-fraction"` + // ProgressInterval will report progress every interval. + ProgressInterval time.Duration `mapstructure:"progress-on-time"` +} + +func WithConfig(cfg Config) Opt { + return func(s *Syncer) { + s.cfg = cfg + } +} + +func New(fetcher fetcher, clock clock, db sql.Executor, localdb *localsql.Database, opts ...Opt) *Syncer { + s := &Syncer{ + logger: zap.NewNop(), + cfg: DefaultConfig(), + fetcher: fetcher, + clock: clock, + db: db, + localdb: localdb, + } + for _, opt := range opts { + opt(s) + } + return s +} + +type Syncer struct { + logger *zap.Logger + cfg Config + fetcher fetcher + clock clock + db sql.Executor + localdb *localsql.Database +} + +func (s *Syncer) closeToTheEpoch(publish types.EpochID, timestamp time.Time) bool { + target := publish + 1 + epochStart := s.clock.LayerToTime(target.FirstLayer()) + return timestamp.After(epochStart) || epochStart.Sub(timestamp) < 2*s.cfg.EpochInfoInterval +} + +func (s *Syncer) Download(parent context.Context, publish types.EpochID) error { + s.logger.Info("starting atx sync", log.ZContext(parent), publish.Field().Zap()) + + state, err := atxsync.GetSyncState(s.localdb, publish) + if err != nil { + return fmt.Errorf("failed to get state for epoch %v: %w", publish, err) + } + lastSuccess, err := atxsync.GetRequestTime(s.localdb, publish) + if err != nil && !errors.Is(err, sql.ErrNotFound) { + return fmt.Errorf("failed to get last request time for epoch %v: %w", publish, err) + } + // in case of immediate we will request epoch info without waiting EpochInfoInterval + immediate := len(state) == 0 || (errors.Is(err, sql.ErrNotFound) || !s.closeToTheEpoch(publish, lastSuccess)) + + ctx, cancel := context.WithCancel(parent) + eg, ctx := errgroup.WithContext(ctx) + updates := make(chan epochUpdate, s.cfg.EpochInfoPeers) + if len(state) == 0 { + state = map[types.ATXID]int{} + } else { + updates <- epochUpdate{time: lastSuccess, update: state} + } + // termination requires two conditions: + // - epoch info has to be successfully downloaded close to or after the epoch start + // - all atxs from that epoch have to be downloaded or they are unavailable. + // atx is unavailable if it was requested more than RequestsLimit times, and no peer provided it. + eg.Go(func() error { + return s.downloadEpochInfo(ctx, publish, immediate, updates) + }) + eg.Go(func() error { + err := s.downloadAtxs(ctx, publish, state, updates) + cancel() + return err + }) + if err := eg.Wait(); err != nil { + return err + } + return parent.Err() +} + +func (s *Syncer) downloadEpochInfo( + ctx context.Context, + publish types.EpochID, + immediate bool, + updates chan<- epochUpdate, +) error { + interval := s.cfg.EpochInfoInterval + if immediate { + interval = 0 + } + for { + if interval != 0 { + s.logger.Debug( + "waiting between epoch info requests", + publish.Field().Zap(), + zap.Duration("duration", interval), + ) + } + select { + case <-ctx.Done(): + return nil + // TODO(dshulyak) this has to be randomized in a followup + // when sync will be schedulled in advance, in order to smooth out request rate across the network + case <-time.After(interval): + } + + peers := s.fetcher.SelectBestShuffled(s.cfg.EpochInfoPeers) + if len(peers) == 0 { + return fmt.Errorf("no peers available") + } + // do not run it concurrently, epoch info is large and will continue to grow + for _, peer := range peers { + epochData, err := s.fetcher.PeerEpochInfo(ctx, peer, publish) + if err != nil || epochData == nil { + if errors.Is(err, context.Canceled) { + return nil + } + s.logger.Warn("failed to download epoch info", + log.ZContext(ctx), + publish.Field().Zap(), + zap.String("peer", peer.String()), + zap.Error(err), + ) + continue + } + s.logger.Info("downloaded epoch info", + log.ZContext(ctx), + publish.Field().Zap(), + zap.String("peer", peer.String()), + zap.Int("atxs", len(epochData.AtxIDs)), + ) + // adding hashes to fetcher is not useful as they overflow the cache and are not used + // so we switch to asking best peers immediately + update := make(map[types.ATXID]int, len(epochData.AtxIDs)) + for _, atx := range epochData.AtxIDs { + update[atx] = 0 + } + select { + case <-ctx.Done(): + return nil + case updates <- epochUpdate{time: time.Now(), update: update}: + } + // after first success switch to requests after interval + interval = s.cfg.EpochInfoInterval + } + } +} + +func (s *Syncer) downloadAtxs( + ctx context.Context, + publish types.EpochID, + state map[types.ATXID]int, + updates <-chan epochUpdate, +) error { + var ( + batch = make([]types.ATXID, 0, s.cfg.AtxsBatch) + downloaded = map[types.ATXID]bool{} + previouslyDownloaded = 0 + start = time.Now() + lastSuccess time.Time + progressTimestamp = start + nothingToDownload = len(state) == 0 + ) + + for { + // waiting for update if there is nothing to download + if nothingToDownload && s.closeToTheEpoch(publish, lastSuccess) { + s.logger.Info( + "atx sync completed", + log.ZContext(ctx), + publish.Field().Zap(), + zap.Int("downloaded", len(downloaded)), + zap.Int("total", len(state)), + zap.Int("unavailable", len(state)-len(downloaded)), + zap.Duration("duration", time.Since(start)), + ) + return nil + } + if nothingToDownload { + select { + case <-ctx.Done(): + return nil + case update := <-updates: + lastSuccess = update.time + for atx, count := range update.update { + state[atx] = count + } + } + } else { + // otherwise check updates periodically but don't stop downloading + select { + case <-ctx.Done(): + return nil + case update := <-updates: + lastSuccess = update.time + for atx, count := range update.update { + state[atx] = count + } + default: + } + } + + for atx, requests := range state { + if downloaded[atx] { + continue + } + exists, err := atxs.Has(s.db, atx) + if err != nil { + return err + } + if exists { + downloaded[atx] = true + continue + } + if requests >= s.cfg.RequestsLimit { + delete(state, atx) + continue + } + batch = append(batch, atx) + if len(batch) == cap(batch) { + break + } + } + nothingToDownload = len(batch) == 0 + + if progress := float64(len(downloaded) - previouslyDownloaded); progress/float64( + len(state), + ) > s.cfg.ProgressFraction && s.cfg.ProgressFraction != 0 || + time.Since(progressTimestamp) > s.cfg.ProgressInterval && s.cfg.ProgressInterval != 0 { + s.logger.Info( + "atx sync progress", + log.ZContext(ctx), + publish.Field().Zap(), + zap.Int("downloaded", len(downloaded)), + zap.Int("total", len(state)), + zap.Int("progress", int(progress)), + zap.Float64("rate per sec", progress/time.Since(progressTimestamp).Seconds()), + ) + previouslyDownloaded = len(downloaded) + progressTimestamp = time.Now() + } + if len(batch) > 0 { + if err := s.fetcher.GetAtxs(ctx, batch); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + s.logger.Debug("failed to download atxs", log.ZContext(ctx), log.NiceZapError(err)) + batchError := &fetch.BatchError{} + if errors.As(err, &batchError) { + for hash, err := range batchError.Errors { + if errors.Is(err, server.ErrPeerResponseFailed) { + state[types.ATXID(hash)]++ + } else if errors.Is(err, pubsub.ErrValidationReject) { + state[types.ATXID(hash)] = s.cfg.RequestsLimit + } + } + } + } + } + + if err := s.localdb.WithTx(ctx, func(tx *sql.Tx) error { + if err := atxsync.SaveRequestTime(tx, publish, lastSuccess); err != nil { + return fmt.Errorf("failed to save request time: %w", err) + } + return atxsync.SaveSyncState(tx, publish, state, s.cfg.RequestsLimit) + }); err != nil { + return fmt.Errorf("failed to persist state for epoch %v: %w", publish, err) + } + batch = batch[:0] + } +} + +type epochUpdate struct { + time time.Time + update map[types.ATXID]int +} diff --git a/syncer/atxsync/syncer_test.go b/syncer/atxsync/syncer_test.go new file mode 100644 index 00000000000..9508a062c62 --- /dev/null +++ b/syncer/atxsync/syncer_test.go @@ -0,0 +1,228 @@ +package atxsync + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/spacemeshos/go-spacemesh/common/types" + "github.com/spacemeshos/go-spacemesh/fetch" + "github.com/spacemeshos/go-spacemesh/log/logtest" + "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/p2p/server" + "github.com/spacemeshos/go-spacemesh/sql" + "github.com/spacemeshos/go-spacemesh/sql/atxs" + "github.com/spacemeshos/go-spacemesh/sql/atxsync" + "github.com/spacemeshos/go-spacemesh/sql/localsql" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync/mocks" + "github.com/spacemeshos/go-spacemesh/system" +) + +func init() { + types.SetLayersPerEpoch(10) +} + +func aid(id string) types.ATXID { + var atx types.ATXID + copy(atx[:], id) + return atx +} + +func edata(ids ...string) *fetch.EpochData { + ed := &fetch.EpochData{} + for _, id := range ids { + ed.AtxIDs = append(ed.AtxIDs, aid(id)) + } + return ed +} + +func newTester(tb testing.TB, cfg Config) *tester { + localdb := localsql.InMemory() + db := sql.InMemory() + ctrl := gomock.NewController(tb) + clock := mocks.NewMockclock(ctrl) + fetcher := mocks.NewMockfetcher(ctrl) + syncer := New(fetcher, clock, db, localdb, WithConfig(cfg), WithLogger(logtest.New(tb).Zap())) + return &tester{ + tb: tb, + syncer: syncer, + localdb: localdb, + db: db, + cfg: cfg, + ctrl: ctrl, + clock: clock, + fetcher: fetcher, + } +} + +type tester struct { + tb testing.TB + syncer *Syncer + localdb *localsql.Database + db *sql.Database + cfg Config + ctrl *gomock.Controller + clock *mocks.Mockclock + fetcher *mocks.Mockfetcher +} + +func TestSyncer(t *testing.T) { + t.Run("sanity", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 100 * time.Microsecond, + EpochInfoPeers: 3, + RequestsLimit: 10, + AtxsBatch: 10, + }) + + peers := []p2p.Peer{"a", "b", "c"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(1) + for _, p := range peers { + tester.fetcher.EXPECT(). + PeerEpochInfo(gomock.Any(), p, publish). + Return(edata("4", "1", "3", "2"), nil). + AnyTimes() + } + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + for _, id := range ids { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + return nil + }).AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) + t.Run("interruptible", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + publish := types.EpochID(1) + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() + require.ErrorIs(t, tester.syncer.Download(ctx, publish), context.Canceled) + }) + t.Run("error on no peers", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(1) + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(time.Now()).AnyTimes() + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(nil) + require.ErrorContains(t, tester.syncer.Download(context.Background(), publish), "no peers available") + }) + t.Run("terminate without queries if sync on time", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(2) + now := time.Now() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(now).AnyTimes() + + state := map[types.ATXID]int{ + aid("1"): 0, + aid("2"): 0, + } + require.NoError(t, atxsync.SaveSyncState(tester.localdb, publish, state, tester.cfg.AtxsBatch)) + lastSuccess := now.Add(-tester.cfg.EpochInfoInterval) + require.NoError(t, atxsync.SaveRequestTime(tester.localdb, publish, lastSuccess)) + for id := range state { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) + t.Run("immediate epoch info retries", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 10 * time.Second, + EpochInfoPeers: 3, + RequestsLimit: 10, + AtxsBatch: 10, + }) + ctx, cancel := context.WithTimeout(context.Background(), tester.cfg.EpochInfoInterval/2) + defer cancel() + + peers := []p2p.Peer{"a"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(2) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(nil, errors.New("bad try")) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(edata("1", "2", "3"), nil) + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + for _, id := range ids { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + return nil + }).AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(ctx, publish)) + }) + t.Run("give up on atx after max retries", func(t *testing.T) { + tester := newTester(t, Config{ + EpochInfoInterval: 200 * time.Hour, + EpochInfoPeers: 2, + RequestsLimit: 10, + AtxsBatch: 2, + }) + + peers := []p2p.Peer{"a", "b"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + publish := types.EpochID(2) + good := edata("1", "2", "3") + bad := edata("4", "5", "6") + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(good, nil) + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[1], publish).Return(bad, nil) + + tester.fetcher.EXPECT(). + GetAtxs(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, ids []types.ATXID, _ ...system.GetAtxOpt) error { + require.LessOrEqual(t, len(ids), tester.cfg.AtxsBatch) + berr := fetch.BatchError{} + for _, id := range ids { + for _, good := range good.AtxIDs { + if good == id { + require.NoError(t, atxs.Add(tester.db, atx(id))) + } + } + for _, bad := range bad.AtxIDs { + if bad == id { + berr.Add(bad.Hash32(), fmt.Errorf("%w: test", server.ErrPeerResponseFailed)) + } + } + } + if berr.Empty() { + return nil + } + return &berr + }). + AnyTimes() + + past := time.Now().Add(-time.Minute) + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(past).AnyTimes() + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + + state, err := atxsync.GetSyncState(tester.localdb, publish) + require.NoError(t, err) + for _, bad := range bad.AtxIDs { + require.NotContains(t, state, bad) + } + }) + t.Run("terminate empty epoch", func(t *testing.T) { + tester := newTester(t, DefaultConfig()) + publish := types.EpochID(2) + now := time.Now() + tester.clock.EXPECT().LayerToTime((publish + 1).FirstLayer()).Return(now).AnyTimes() + peers := []p2p.Peer{"a"} + tester.fetcher.EXPECT().SelectBestShuffled(tester.cfg.EpochInfoPeers).Return(peers).AnyTimes() + tester.fetcher.EXPECT().PeerEpochInfo(gomock.Any(), peers[0], publish).Return(edata(), nil) + require.NoError(t, tester.syncer.Download(context.Background(), publish)) + }) +} diff --git a/syncer/interface.go b/syncer/interface.go index 62c0b4de539..af57b4ea47f 100644 --- a/syncer/interface.go +++ b/syncer/interface.go @@ -33,7 +33,10 @@ type fetchLogic interface { bool, []p2p.Peer, ) ([]*fetch.LayerOpinion, []*types.Certificate, error) - GetEpochATXs(context.Context, types.EpochID) error +} + +type atxSyncer interface { + Download(context.Context, types.EpochID) error } // fetcher is the interface to the low-level fetching. diff --git a/syncer/mocks/mocks.go b/syncer/mocks/mocks.go index 69deec43813..a0927f3c8c7 100644 --- a/syncer/mocks/mocks.go +++ b/syncer/mocks/mocks.go @@ -322,44 +322,6 @@ func (c *MockfetchLogicGetCertCall) DoAndReturn(f func(context.Context, types.La return c } -// GetEpochATXs mocks base method. -func (m *MockfetchLogic) GetEpochATXs(arg0 context.Context, arg1 types.EpochID) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetEpochATXs", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// GetEpochATXs indicates an expected call of GetEpochATXs. -func (mr *MockfetchLogicMockRecorder) GetEpochATXs(arg0, arg1 any) *MockfetchLogicGetEpochATXsCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEpochATXs", reflect.TypeOf((*MockfetchLogic)(nil).GetEpochATXs), arg0, arg1) - return &MockfetchLogicGetEpochATXsCall{Call: call} -} - -// MockfetchLogicGetEpochATXsCall wrap *gomock.Call -type MockfetchLogicGetEpochATXsCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *MockfetchLogicGetEpochATXsCall) Return(arg0 error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *MockfetchLogicGetEpochATXsCall) Do(f func(context.Context, types.EpochID) error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *MockfetchLogicGetEpochATXsCall) DoAndReturn(f func(context.Context, types.EpochID) error) *MockfetchLogicGetEpochATXsCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // GetLayerData mocks base method. func (m *MockfetchLogic) GetLayerData(arg0 context.Context, arg1 p2p.Peer, arg2 types.LayerID) ([]byte, error) { m.ctrl.T.Helper() @@ -788,6 +750,67 @@ func (c *MockfetchLogicSelectBestShuffledCall) DoAndReturn(f func(int) []p2p.Pee return c } +// MockatxSyncer is a mock of atxSyncer interface. +type MockatxSyncer struct { + ctrl *gomock.Controller + recorder *MockatxSyncerMockRecorder +} + +// MockatxSyncerMockRecorder is the mock recorder for MockatxSyncer. +type MockatxSyncerMockRecorder struct { + mock *MockatxSyncer +} + +// NewMockatxSyncer creates a new mock instance. +func NewMockatxSyncer(ctrl *gomock.Controller) *MockatxSyncer { + mock := &MockatxSyncer{ctrl: ctrl} + mock.recorder = &MockatxSyncerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockatxSyncer) EXPECT() *MockatxSyncerMockRecorder { + return m.recorder +} + +// Download mocks base method. +func (m *MockatxSyncer) Download(arg0 context.Context, arg1 types.EpochID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Download", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Download indicates an expected call of Download. +func (mr *MockatxSyncerMockRecorder) Download(arg0, arg1 any) *MockatxSyncerDownloadCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockatxSyncer)(nil).Download), arg0, arg1) + return &MockatxSyncerDownloadCall{Call: call} +} + +// MockatxSyncerDownloadCall wrap *gomock.Call +type MockatxSyncerDownloadCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockatxSyncerDownloadCall) Return(arg0 error) *MockatxSyncerDownloadCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockatxSyncerDownloadCall) Do(f func(context.Context, types.EpochID) error) *MockatxSyncerDownloadCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockatxSyncerDownloadCall) DoAndReturn(f func(context.Context, types.EpochID) error) *MockatxSyncerDownloadCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Mockfetcher is a mock of fetcher interface. type Mockfetcher struct { ctrl *gomock.Controller diff --git a/syncer/syncer.go b/syncer/syncer.go index 45135209494..32850695d70 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -16,6 +16,7 @@ import ( "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/mesh" "github.com/spacemeshos/go-spacemesh/p2p" + "github.com/spacemeshos/go-spacemesh/syncer/atxsync" "github.com/spacemeshos/go-spacemesh/system" ) @@ -27,10 +28,11 @@ type Config struct { SyncCertDistance uint32 MaxStaleDuration time.Duration `mapstructure:"maxstaleduration"` Standalone bool - GossipDuration time.Duration `mapstructure:"gossipduration"` - DisableMeshAgreement bool `mapstructure:"disable-mesh-agreement"` - DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` - OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` + GossipDuration time.Duration `mapstructure:"gossipduration"` + DisableMeshAgreement bool `mapstructure:"disable-mesh-agreement"` + DisableAtxReconciliation bool `mapstructure:"disable-atx-reconciliation"` + OutOfSyncThresholdLayers uint32 `mapstructure:"out-of-sync-threshold"` + AtxSync atxsync.Config `mapstructure:"atx-sync"` } // DefaultConfig for the syncer. @@ -43,6 +45,7 @@ func DefaultConfig() Config { MaxStaleDuration: time.Second, GossipDuration: 15 * time.Second, OutOfSyncThresholdLayers: 3, + AtxSync: atxsync.DefaultConfig(), } } @@ -114,6 +117,7 @@ type Syncer struct { cfg Config cdb *datastore.CachedDB + atxsyncer atxSyncer ticker layerTicker beacon system.BeaconGetter mesh *mesh.Mesh @@ -149,12 +153,14 @@ func NewSyncer( fetcher fetcher, patrol layerPatrol, ch certHandler, + atxSyncer atxSyncer, opts ...Option, ) *Syncer { s := &Syncer{ logger: log.NewNop(), cfg: DefaultConfig(), cdb: cdb, + atxsyncer: atxSyncer, ticker: ticker, beacon: beacon, mesh: mesh, @@ -377,7 +383,9 @@ func (s *Syncer) synchronize(ctx context.Context) bool { } if err := s.syncAtx(ctx); err != nil { - s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err)) + if !errors.Is(err, context.Canceled) { + s.logger.With().Error("failed to sync atxs", log.Context(ctx), log.Err(err)) + } return false } @@ -443,7 +451,7 @@ func (s *Syncer) syncAtx(ctx context.Context) error { } } if s.cfg.DisableAtxReconciliation { - s.logger.Debug("atx sync disabled") + s.logger.Debug("atx sync reconciliation is disabled") return nil } // steady state atx syncing @@ -563,7 +571,7 @@ func (s *Syncer) syncLayer(ctx context.Context, layerID types.LayerID, peers ... // fetching ATXs published the specified epoch. func (s *Syncer) fetchATXsForEpoch(ctx context.Context, epoch types.EpochID) error { - if err := s.dataFetcher.GetEpochATXs(ctx, epoch); err != nil { + if err := s.atxsyncer.Download(ctx, epoch); err != nil { return err } s.setLastAtxEpoch(epoch) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 7896065ad2f..ca304a8da70 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -65,6 +65,7 @@ type testSyncer struct { mTicker *mockLayerTicker mDataFetcher *mocks.MockfetchLogic + mAtxSyncer *mocks.MockatxSyncer mBeacon *smocks.MockBeaconGetter mLyrPatrol *mocks.MocklayerPatrol mVm *mmocks.MockvmState @@ -78,9 +79,11 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { lg := logtest.New(t) mt := newMockLayerTicker() ctrl := gomock.NewController(t) + ts := &testSyncer{ mTicker: mt, mDataFetcher: mocks.NewMockfetchLogic(ctrl), + mAtxSyncer: mocks.NewMockatxSyncer(ctrl), mBeacon: smocks.NewMockBeaconGetter(ctrl), mLyrPatrol: mocks.NewMocklayerPatrol(ctrl), mVm: mmocks.NewMockvmState(ctrl), @@ -114,6 +117,7 @@ func newTestSyncer(t *testing.T, interval time.Duration) *testSyncer { nil, ts.mLyrPatrol, ts.mCertHdr, + ts.mAtxSyncer, WithConfig(cfg), WithLogger(lg), withDataFetcher(ts.mDataFetcher), @@ -165,7 +169,7 @@ func TestSynchronize_OnlyOneSynchronize(t *testing.T) { defer cancel() ts.syncer.Start() - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) gLayer := types.GetEffectiveGenesis() @@ -223,7 +227,7 @@ func TestSynchronize_AllGood(t *testing.T) { current := gLayer.Add(10) ts.mTicker.advanceToLayer(current) for epoch := gLayer.GetEpoch(); epoch <= current.GetEpoch(); epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) for lid := gLayer.Add(1); lid.Before(current); lid = lid.Add(1) { @@ -276,8 +280,8 @@ func TestSynchronize_FetchLayerDataFailed(t *testing.T) { current := gLayer.Add(2) ts.mTicker.advanceToLayer(current) lyr := current.Sub(1) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()+1) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()+1) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("meh")) @@ -296,7 +300,7 @@ func TestSynchronize_FetchMalfeasanceFailed(t *testing.T) { current := gLayer.Add(2) ts.mTicker.advanceToLayer(current) lyr := current.Sub(1) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()).Return(errors.New("meh")) require.False(t, ts.syncer.synchronize(context.Background())) @@ -310,10 +314,10 @@ func TestSynchronize_FailedInitialATXsSync(t *testing.T) { current := types.LayerID(layersPerEpoch * uint32(failedEpoch+1)) ts.mTicker.advanceToLayer(current) for epoch := types.GetEffectiveGenesis().GetEpoch(); epoch < failedEpoch; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } - ts.mDataFetcher.EXPECT(). - GetEpochATXs(gomock.Any(), failedEpoch). + ts.mAtxSyncer.EXPECT(). + Download(gomock.Any(), failedEpoch). Return(errors.New("no ATXs. should fail sync")) var wg sync.WaitGroup @@ -355,7 +359,7 @@ func startWithSyncedState(t *testing.T, ts *testSyncer) types.LayerID { gLayer := types.GetEffectiveGenesis() ts.mTicker.advanceToLayer(gLayer) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gLayer.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gLayer.GetEpoch()) require.True(t, ts.syncer.synchronize(context.Background())) require.True(t, ts.syncer.ListenToATXGossip()) require.True(t, ts.syncer.ListenToGossip()) @@ -397,7 +401,7 @@ func TestSyncAtxs_Genesis(t *testing.T) { if tc.lastSynced > 0 { require.False(t, ts.syncer.ListenToATXGossip()) for epoch := types.EpochID(1); epoch <= tc.lastSynced; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) } @@ -435,7 +439,7 @@ func TestSyncAtxs(t *testing.T) { require.Equal(t, lyr.GetEpoch()-1, ts.syncer.lastAtxEpoch()) ts.mTicker.advanceToLayer(tc.current) for epoch := lyr.GetEpoch(); epoch <= tc.lastSyncEpoch; epoch++ { - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), epoch) } for lid := lyr; lid < tc.current; lid++ { ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid) @@ -451,7 +455,7 @@ func TestSynchronize_StaySyncedUponFailure(t *testing.T) { lyr := startWithSyncedState(t, ts) current := lyr.Add(1) ts.mTicker.advanceToLayer(current) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), current.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), current.GetEpoch()) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("doh")) require.False(t, ts.syncer.synchronize(context.Background())) @@ -478,7 +482,7 @@ func TestSynchronize_BecomeNotSyncedUponFailureIfNoGossip(t *testing.T) { // test the case where the node originally starts from notSynced and eventually becomes synced. func TestFromNotSyncedToSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(5) @@ -512,7 +516,7 @@ func TestFromNotSyncedToSynced(t *testing.T) { // to notSynced. func TestFromGossipSyncToNotSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(1) ts.mTicker.advanceToLayer(current) @@ -546,7 +550,7 @@ func TestNetworkHasNoData(t *testing.T) { lyr := startWithSyncedState(t, ts) require.True(t, ts.syncer.IsSynced(context.Background())) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() for lid := lyr.Add(1); lid < lyr.Add(outOfSyncThreshold+1); lid++ { ts.mTicker.advanceToLayer(lid) ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), gomock.Any()) @@ -568,7 +572,7 @@ func TestNetworkHasNoData(t *testing.T) { // eventually become synced again. func TestFromSyncedToNotSynced(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()).AnyTimes() require.True(t, ts.syncer.synchronize(context.Background())) @@ -619,7 +623,7 @@ func waitOutGossipSync(t *testing.T, ts *testSyncer) { func TestSync_AlsoSyncProcessedLayer(t *testing.T) { ts := newSyncerWithoutPeriodicRuns(t) - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes() + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), gomock.Any()).AnyTimes() ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) lyr := types.GetEffectiveGenesis().Add(1) current := lyr.Add(1) @@ -692,13 +696,14 @@ func TestSynchronize_RecoverFromCheckpoint(t *testing.T) { nil, ts.mLyrPatrol, ts.mCertHdr, + ts.mAtxSyncer, WithConfig(ts.syncer.cfg), WithLogger(ts.syncer.logger), withDataFetcher(ts.mDataFetcher), withForkFinder(ts.mForkFinder), ) // should not sync any atxs before current epoch - ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), current.GetEpoch()) + ts.mAtxSyncer.EXPECT().Download(gomock.Any(), current.GetEpoch()) ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any()) require.True(t, ts.syncer.synchronize(context.Background())) require.Equal(t, current.GetEpoch(), ts.syncer.lastAtxEpoch())