diff --git a/app/node/build.go b/app/node/build.go index 2ac27b4f5..091147288 100644 --- a/app/node/build.go +++ b/app/node/build.go @@ -214,7 +214,7 @@ func initializeStatesyncService(ctx context.Context, d *coreDependencies, p2p *n func buildDB(ctx context.Context, d *coreDependencies, ss *node.StateSyncService, closers *closeFuncs) *pg.DB { pg.UseLogger(d.logger.New("PG")) - fromSnapshot := restoreDB(d, ctx, ss) + fromGenesisSnapshot := restoreDB(d, ctx, ss) db, err := d.dbOpener(ctx, d.cfg.DB.DBName, d.cfg.DB.MaxConns) if err != nil { @@ -222,8 +222,7 @@ func buildDB(ctx context.Context, d *coreDependencies, ss *node.StateSyncService } closers.addCloser(db.Close, "Closing application DB") - if fromSnapshot { - d.logger.Info("DB restored from snapshot", "snapshot", d.cfg.GenesisState) + if fromGenesisSnapshot { // readjust the expiry heights of all the pending resolutions after snapshot restore for Zero-downtime migrations // snapshot tool handles the migration expiry height readjustment for offline migrations // adjustExpiration := false @@ -254,7 +253,7 @@ func buildDB(ctx context.Context, d *coreDependencies, ss *node.StateSyncService // - If statesync is enabled. Statesync will take care of syncing the database // to the network state using statesync snapshots. // -// returns true if the DB was restored from snapshot, false otherwise. +// returns true if the DB was restored from genesis snapshot, false otherwise. func restoreDB(d *coreDependencies, ctx context.Context, ss *node.StateSyncService) bool { if isDbInitialized(ctx, d) { return false @@ -269,7 +268,7 @@ func restoreDB(d *coreDependencies, ctx context.Context, ss *node.StateSyncServi if success { d.logger.Info("DB restored from statesync snapshot") - return true + return false } // If statesync is not successful, restore from the genesis snapshot if available @@ -316,6 +315,8 @@ func restoreDB(d *coreDependencies, ctx context.Context, ss *node.StateSyncServi if err != nil { failBuild(err, "failed to restore DB from snapshot") } + + d.logger.Info("DB restored from snapshot", "snapshot", d.cfg.GenesisState) return true } diff --git a/node/consensus/blocksync.go b/node/consensus/blocksync.go index 22998cca6..f8e2cd162 100644 --- a/node/consensus/blocksync.go +++ b/node/consensus/blocksync.go @@ -157,7 +157,7 @@ func (ce *ConsensusEngine) syncBlocksUntilHeight(ctx context.Context, startHeigh func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) error { _, rawblk, ci, err := ce.getBlock(ctx, height) if err != nil { // all kinds of errors? - return fmt.Errorf("error requesting block from network: height : %d, error: %w", height, err) + return err } return ce.applyBlock(ctx, rawblk, ci) @@ -167,7 +167,7 @@ func (ce *ConsensusEngine) syncBlockWithRetry(ctx context.Context, height int64) func (ce *ConsensusEngine) syncBlock(ctx context.Context, height int64) error { _, rawblk, ci, err := ce.blkRequester(ctx, height) if err != nil { // all kinds of errors? - return fmt.Errorf("error requesting block from network: height : %d, error: %w", height, err) + return err } return ce.applyBlock(ctx, rawblk, ci) diff --git a/node/consensus/engine.go b/node/consensus/engine.go index 63c78bf71..7e281b133 100644 --- a/node/consensus/engine.go +++ b/node/consensus/engine.go @@ -797,7 +797,6 @@ func (ce *ConsensusEngine) processCurrentBlock(ctx context.Context) error { // otherwise rollback. blkHash, rawBlk, ci, err := ce.getBlock(ctx, ce.state.blkProp.height) if err != nil { - ce.log.Debug("Error requesting block from network", "height", ce.state.blkProp.height, "error", err) return err } diff --git a/node/node_live_test.go b/node/node_live_test.go index 3b5badae0..164d4aed7 100644 --- a/node/node_live_test.go +++ b/node/node_live_test.go @@ -80,7 +80,7 @@ func TestSingleNodeMocknet(t *testing.T) { valSetList = append(valSetList, &v) } - ss := newSnapshotStore() + ss := newSnapshotStore(bs1) _, vsReal, err := voting.NewResolutionStore(ctx, db1) require.NoError(t, err) @@ -103,7 +103,7 @@ func TestSingleNodeMocknet(t *testing.T) { accounts, err := accounts.InitializeAccountStore(ctx, db1, log.DiscardLogger) require.NoError(t, err) - migrator, err := migrations.SetupMigrator(ctx, db1, newSnapshotStore(), accounts, filepath.Join(root1, "migrations"), mparams, vsReal, log.New(log.WithName("MIGRATOR"))) + migrator, err := migrations.SetupMigrator(ctx, db1, newSnapshotStore(bs1), accounts, filepath.Join(root1, "migrations"), mparams, vsReal, log.New(log.WithName("MIGRATOR"))) require.NoError(t, err) signer := auth.GetNodeSigner(privKeys[0]) @@ -238,7 +238,7 @@ func TestDualNodeMocknet(t *testing.T) { for _, v := range valSet { valSetList = append(valSetList, &v) } - ss := newSnapshotStore() + ss := newSnapshotStore(bs1) genCfg := config.DefaultGenesisConfig() genCfg.Leader = ktypes.PublicKey{PublicKey: privKeys[0].Public()} @@ -264,7 +264,7 @@ func TestDualNodeMocknet(t *testing.T) { }, accounts1, vstore1) require.NoError(t, err) - migrator, err := migrations.SetupMigrator(ctx, db1, newSnapshotStore(), accounts1, filepath.Join(root1, "migrations"), mparams, vstore1, log.New(log.WithName("MIGRATOR"))) + migrator, err := migrations.SetupMigrator(ctx, db1, newSnapshotStore(bs1), accounts1, filepath.Join(root1, "migrations"), mparams, vstore1, log.New(log.WithName("MIGRATOR"))) require.NoError(t, err) bpl1 := log.New(log.WithName("BP1"), log.WithWriter(os.Stdout), log.WithLevel(log.LevelDebug), log.WithFormat(log.FormatUnstructured)) @@ -338,7 +338,7 @@ func TestDualNodeMocknet(t *testing.T) { _, vstore2, err := voting.NewResolutionStore(ctx, db2) require.NoError(t, err) - migrator2, err := migrations.SetupMigrator(ctx, db2, newSnapshotStore(), accounts2, filepath.Join(root2, "migrations"), mparams, vstore2, log.New(log.WithName("MIGRATOR"))) + migrator2, err := migrations.SetupMigrator(ctx, db2, newSnapshotStore(bs2), accounts2, filepath.Join(root2, "migrations"), mparams, vstore2, log.New(log.WithName("MIGRATOR"))) require.NoError(t, err) signer2 := auth.GetNodeSigner(privKeys[1]) diff --git a/node/node_test.go b/node/node_test.go index 2559dcfee..a93887b94 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -111,7 +111,7 @@ func makeTestHosts(t *testing.T, nNodes, nExtraHosts int, blockInterval time.Dur Statesync: &defaultConfigSet.StateSync, Mempool: mempool.New(), BlockStore: bs, - Snapshotter: newSnapshotStore(), + Snapshotter: newSnapshotStore(bs), Consensus: ce, BlockProc: &dummyBP{}, } diff --git a/node/statesync_test.go b/node/statesync_test.go index 03fbb0dd7..83cbe44ad 100644 --- a/node/statesync_test.go +++ b/node/statesync_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "encoding/json" "errors" "fmt" "os" @@ -20,6 +21,7 @@ import ( dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" mock "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,97 +49,6 @@ var ( } ) -type mockBS struct { -} - -func (m *mockBS) GetByHeight(height int64) (types.Hash, *ktypes.Block, *types.CommitInfo, error) { - return types.Hash{}, nil, &types.CommitInfo{AppHash: types.Hash{}}, nil -} - -func (m *mockBS) Store(*ktypes.Block, *types.CommitInfo) error { - return nil -} - -func (m *mockBS) Best() (int64, types.Hash, types.Hash, time.Time) { - return 0, types.Hash{}, types.Hash{}, time.Time{} -} - -type snapshotStore struct { - snapshots map[uint64]*snapshotMetadata -} - -func newSnapshotStore() *snapshotStore { - return &snapshotStore{ - snapshots: make(map[uint64]*snapshotMetadata), - } -} - -func (s *snapshotStore) addSnapshot(snapshot *snapshotMetadata) { - s.snapshots[snapshot.Height] = snapshot -} - -func (s *snapshotStore) ListSnapshots() []*snapshotter.Snapshot { - snapshots := make([]*snapshotter.Snapshot, 0, len(s.snapshots)) - for _, snapshot := range s.snapshots { - snap := &snapshotter.Snapshot{ - Height: snapshot.Height, - Format: snapshot.Format, - ChunkCount: snapshot.Chunks, - SnapshotSize: snapshot.Size, - SnapshotHash: snapshot.Hash, - ChunkHashes: make([][32]byte, len(snapshot.ChunkHashes)), - } - - for j, hash := range snapshot.ChunkHashes { - copy(snap.ChunkHashes[j][:], hash[:]) - } - - snapshots = append(snapshots, snap) - } - return snapshots -} - -func (s *snapshotStore) LoadSnapshotChunk(height uint64, format uint32, index uint32) ([]byte, error) { - snapshot, ok := s.snapshots[height] - if !ok { - return nil, errors.New("snapshot not found") - } - - if index >= snapshot.Chunks { - return nil, errors.New("chunk not found") - } - - return []byte("snapshot"), nil -} - -func (s *snapshotStore) GetSnapshot(height uint64, format uint32) *snapshotter.Snapshot { - snapshot, ok := s.snapshots[height] - if !ok { - return nil - } - - return &snapshotter.Snapshot{ - Height: snapshot.Height, - Format: snapshot.Format, - ChunkCount: snapshot.Chunks, - SnapshotSize: snapshot.Size, - SnapshotHash: snapshot.Hash, - ChunkHashes: snapshot.ChunkHashes, - } -} - -func (s *snapshotStore) Enabled() bool { - return true -} - -func (s *snapshotStore) IsSnapshotDue(height uint64) bool { - return false -} - -func (s *snapshotStore) CreateSnapshot(ctx context.Context, height uint64, snapshotID string, schemas, excludedTables []string, excludeTableData []string) error { - return nil -} - func newTestStatesyncer(ctx context.Context, t *testing.T, mn mock.Mocknet, rootDir string, sCfg *config.StateSyncConfig) (host.Host, discovery.Discovery, *snapshotStore, *StateSyncService, crypto.PrivateKey, error) { pkBts, h := newTestHost(t, mn) pk, err := crypto.UnmarshalSecp256k1PrivateKey(pkBts) @@ -155,7 +66,7 @@ func newTestStatesyncer(ctx context.Context, t *testing.T, mn mock.Mocknet, root os.MkdirAll(rootDir, os.ModePerm) bs := &mockBS{} - st := newSnapshotStore() + st := newSnapshotStore(bs) cfg := &StatesyncConfig{ StateSyncCfg: sCfg, RcvdSnapsDir: rootDir, @@ -172,6 +83,10 @@ func newTestStatesyncer(ctx context.Context, t *testing.T, mn mock.Mocknet, root return nil, nil, nil, nil, nil, err } + h.SetStreamHandler(snapshotter.ProtocolIDSnapshotCatalog, st.snapshotCatalogRequestHandler) + h.SetStreamHandler(snapshotter.ProtocolIDSnapshotChunk, st.snapshotChunkRequestHandler) + h.SetStreamHandler(snapshotter.ProtocolIDSnapshotMeta, st.snapshotMetadataRequestHandler) + return h, discover, st, ss, pk, nil } @@ -240,43 +155,237 @@ func TestStateSyncService(t *testing.T) { // Request the snapshot catalogs for _, p := range peers { err = ss3.requestSnapshotCatalogs(ctx, p) - require.Error(t, err) + require.NoError(t, err) } // should receive the snapshot catalog: snap1 from h2 snaps := ss3.snapshotPool.listSnapshots() - require.Len(t, snaps, 0) + require.Len(t, snaps, 1) // best snapshot should be snap1 bestSnap, err := ss3.bestSnapshot() - if err == nil { - assert.Equal(t, snap1.Height, bestSnap.Height) - assert.Equal(t, snap1.Hash, bestSnap.Hash) + require.NoError(t, err) + assert.Equal(t, snap1.Height, bestSnap.Height) + assert.Equal(t, snap1.Hash, bestSnap.Hash) - // Validate the snapshot should fail as the trusted provider does not have the snapshot - valid, _ := ss3.VerifySnapshot(ctx, snap1) - assert.False(t, valid) + // Validate the snapshot should fail as the trusted provider does not have the snapshot + valid, _ := ss3.VerifySnapshot(ctx, snap1) + assert.False(t, valid) - // add snap1 to the trusted provider - st1.addSnapshot(snap1) + // add snap1 to the trusted provider + st1.addSnapshot(snap1) - valid, _ = ss3.VerifySnapshot(ctx, snap1) - assert.True(t, valid) + valid, _ = ss3.VerifySnapshot(ctx, snap1) + assert.True(t, valid) - // add snap2 to the trusted provider - st1.addSnapshot(snap2) + // add snap2 to the trusted provider + st1.addSnapshot(snap2) + + // best snapshot should be snap2 + for _, p := range peers { + err = ss3.requestSnapshotCatalogs(ctx, p) + require.NoError(t, err) + } - // best snapshot should be snap2 - for _, p := range peers { - err = ss3.requestSnapshotCatalogs(ctx, p) - require.NoError(t, err) + bestSnap, err = ss3.bestSnapshot() + require.NoError(t, err) + assert.Equal(t, snap2.Height, bestSnap.Height) + + valid, _ = ss3.VerifySnapshot(ctx, bestSnap) + assert.True(t, valid) +} + +type mockBS struct { +} + +func (m *mockBS) GetByHeight(height int64) (types.Hash, *ktypes.Block, *types.CommitInfo, error) { + return types.Hash{}, nil, &types.CommitInfo{AppHash: types.Hash{}}, nil +} + +func (m *mockBS) Store(*ktypes.Block, *types.CommitInfo) error { + return nil +} + +func (m *mockBS) Best() (int64, types.Hash, types.Hash, time.Time) { + return 0, types.Hash{}, types.Hash{}, time.Time{} +} + +type snapshotStore struct { + snapshots map[uint64]*snapshotMetadata + bs blockStore +} + +func newSnapshotStore(bs blockStore) *snapshotStore { + return &snapshotStore{ + snapshots: make(map[uint64]*snapshotMetadata), + bs: bs, + } +} + +func (s *snapshotStore) addSnapshot(snapshot *snapshotMetadata) { + s.snapshots[snapshot.Height] = snapshot +} + +func (s *snapshotStore) ListSnapshots() []*snapshotter.Snapshot { + snapshots := make([]*snapshotter.Snapshot, 0, len(s.snapshots)) + for _, snapshot := range s.snapshots { + snap := &snapshotter.Snapshot{ + Height: snapshot.Height, + Format: snapshot.Format, + ChunkCount: snapshot.Chunks, + SnapshotSize: snapshot.Size, + SnapshotHash: snapshot.Hash, + ChunkHashes: make([][32]byte, len(snapshot.ChunkHashes)), } - bestSnap, err = ss3.bestSnapshot() - require.NoError(t, err) - assert.Equal(t, snap2.Height, bestSnap.Height) + for j, hash := range snapshot.ChunkHashes { + copy(snap.ChunkHashes[j][:], hash[:]) + } + + snapshots = append(snapshots, snap) + } + return snapshots +} + +func (s *snapshotStore) LoadSnapshotChunk(height uint64, format uint32, index uint32) ([]byte, error) { + snapshot, ok := s.snapshots[height] + if !ok { + return nil, errors.New("snapshot not found") + } + + if index >= snapshot.Chunks { + return nil, errors.New("chunk not found") + } + + return []byte("snapshot"), nil +} + +func (s *snapshotStore) GetSnapshot(height uint64, format uint32) *snapshotter.Snapshot { + snapshot, ok := s.snapshots[height] + if !ok { + return nil + } + + return &snapshotter.Snapshot{ + Height: snapshot.Height, + Format: snapshot.Format, + ChunkCount: snapshot.Chunks, + SnapshotSize: snapshot.Size, + SnapshotHash: snapshot.Hash, + ChunkHashes: snapshot.ChunkHashes, + } +} + +func (s *snapshotStore) Enabled() bool { + return true +} + +func (s *snapshotStore) IsSnapshotDue(height uint64) bool { + return false +} + +func (s *snapshotStore) CreateSnapshot(ctx context.Context, height uint64, snapshotID string, schemas, excludedTables []string, excludeTableData []string) error { + return nil +} + +func (s *snapshotStore) snapshotCatalogRequestHandler(stream network.Stream) { + defer stream.Close() + stream.SetReadDeadline(time.Now().Add(time.Second)) + + req := make([]byte, len(snapshotter.DiscoverSnapshotsMsg)) + n, err := stream.Read(req) + if err != nil { + return + } - valid, _ = ss3.VerifySnapshot(ctx, bestSnap) - assert.True(t, valid) + if n == 0 { // no request, hung up + return } + + snapshots := s.ListSnapshots() + if snapshots == nil { // nothing to send + stream.SetWriteDeadline(time.Now().Add(reqRWTimeout)) + stream.Write(noData) + return + } + + // send the snapshot catalogs + catalogs := make([]*snapshotter.SnapshotMetadata, len(snapshots)) + for i, snap := range snapshots { + catalogs[i] = snapshotToMetadata(snap) + } + + encoder := json.NewEncoder(stream) + stream.SetWriteDeadline(time.Now().Add(catalogSendTimeout)) + if err := encoder.Encode(catalogs); err != nil { + return + } +} + +func (s *snapshotStore) snapshotChunkRequestHandler(stream network.Stream) { + defer stream.Close() + stream.SetReadDeadline(time.Now().Add(chunkGetTimeout)) + var req snapshotter.SnapshotChunkReq + if _, err := req.ReadFrom(stream); err != nil { + return + } + chunk, err := s.LoadSnapshotChunk(req.Height, req.Format, req.Index) + if err != nil { + stream.SetWriteDeadline(time.Now().Add(reqRWTimeout)) + stream.Write(noData) + return + } + stream.SetWriteDeadline(time.Now().Add(chunkSendTimeout)) + stream.Write(chunk) +} + +func (s *snapshotStore) snapshotMetadataRequestHandler(stream network.Stream) { + defer stream.Close() + stream.SetReadDeadline(time.Now().Add(chunkGetTimeout)) + var req snapshotter.SnapshotReq + if _, err := req.ReadFrom(stream); err != nil { + return + } + snap := s.GetSnapshot(req.Height, req.Format) + if snap == nil { + stream.SetWriteDeadline(time.Now().Add(reqRWTimeout)) + stream.Write(noData) + return + } + + meta := snapshotToMetadata(snap) + + // get the app hash from the db + _, _, ci, err := s.bs.GetByHeight(int64(snap.Height)) + if err != nil || ci == nil { + stream.SetWriteDeadline(time.Now().Add(reqRWTimeout)) + stream.Write(noData) + return + } + meta.AppHash = ci.AppHash[:] + + // send the snapshot data + encoder := json.NewEncoder(stream) + + stream.SetWriteDeadline(time.Now().Add(chunkSendTimeout)) + if err := encoder.Encode(meta); err != nil { + return + } +} + +func snapshotToMetadata(s *snapshotter.Snapshot) *snapshotter.SnapshotMetadata { + meta := &snapshotter.SnapshotMetadata{ + Height: s.Height, + Format: s.Format, + Chunks: s.ChunkCount, + Hash: s.SnapshotHash, + Size: s.SnapshotSize, + ChunkHashes: make([][32]byte, s.ChunkCount), + } + + for i, chunk := range s.ChunkHashes { + copy(meta.ChunkHashes[i][:], chunk[:]) + } + + return meta } diff --git a/test/integration/kwild_test.go b/test/integration/kwild_test.go index 2eb26ccca..93619da82 100644 --- a/test/integration/kwild_test.go +++ b/test/integration/kwild_test.go @@ -362,6 +362,115 @@ func TestStatesync(t *testing.T) { } } +// TestStatesyncWithValidatorUpdates verifies that nodes using statesync +// correctly reflect changes to the validator set and maintain the +// correct resolution state in the Votestore. +func TestStatesyncWithValidatorUpdates(t *testing.T) { + for _, driver := range setup.AllDrivers { + t.Run(driver.String()+"_driver", func(t *testing.T) { + p := setup.SetupTests(t, &setup.TestConfig{ + ClientDriver: driver, + Network: &setup.NetworkConfig{ + Nodes: []*setup.NodeConfig{ + setup.CustomNodeConfig(func(nc *setup.NodeConfig) { + nc.Configure = func(conf *config.Config) { + conf.Snapshots.Enable = true + conf.Snapshots.RecurringHeight = 50 + } + }), + setup.CustomNodeConfig(func(nc *setup.NodeConfig) { + nc.Validator = false + nc.Configure = func(conf *config.Config) { + conf.Snapshots.Enable = true + conf.Snapshots.RecurringHeight = 50 + } + }), + setup.CustomNodeConfig(func(nc *setup.NodeConfig) { + nc.Validator = false + nc.Configure = func(conf *config.Config) { + conf.StateSync.Enable = true + } + }), + setup.CustomNodeConfig(func(nc *setup.NodeConfig) { + nc.Validator = false + nc.Configure = func(conf *config.Config) { + conf.StateSync.Enable = true + } + }), + }, + DBOwner: OwnerAddress, + }, + InitialServices: []string{"node0", "node1", "pg0", "pg1"}, // should bringup only node 0,1 + }) + ctx := context.Background() + + // wait for all the nodes to discover each other and to produce snapshots + clt := p.Nodes[0].JSONRPCClient(t, ctx, &setup.ClientOptions{ + PrivateKey: UserPrivkey1, + }) + + specifications.CreateNamespaceSpecification(ctx, t, clt, false) + specifications.CreateSchemaSpecification(ctx, t, clt) + + user := &specifications.User{Id: 1, Name: "Alice", Age: 25} + specifications.AddUserSpecification(ctx, t, clt, user) + specifications.ListUsersSpecification(ctx, t, clt, false, 1) + + // node1 issues a join request to become a validator and before node0 approves the request + // node2 joins the network and should see the join request from node1 + n1Admin := p.Nodes[1].AdminClient(t, ctx) + specifications.ValidatorNodeJoinSpecification(ctx, t, n1Admin, p.Nodes[1].PrivateKey(), 1) + + time.Sleep(45 * time.Second) // wait for the node0 to produce a snapshot + + // bring up node2, pg2 and ensure that it does blocksync correctly + p.RunServices(t, ctx, []*setup.ServiceDefinition{ + setup.KwildServiceDefinition("node2"), + setup.PostgresServiceDefinition("pg2"), + }, 2*time.Minute) + + // time for node to blocksync and catch up + time.Sleep(4 * time.Second) + + // ensure that all nodes are in sync + info, err := p.Nodes[2].JSONRPCClient(t, ctx, nil).ChainInfo(ctx) + require.NoError(t, err) + require.Greater(t, info.BlockHeight, uint64(50)) + + clt2 := p.Nodes[2].JSONRPCClient(t, ctx, &setup.ClientOptions{ + PrivateKey: UserPrivkey1, + }) + specifications.ListUsersSpecification(ctx, t, clt2, false, 1) + + // node2 should see the join request from node1 + n2Admin := p.Nodes[2].AdminClient(t, ctx) + specifications.ValidatorJoinStatusSpecification(ctx, t, n2Admin, p.Nodes[1].PrivateKey(), 1) + + // node0 approves the join request + n0Admin := p.Nodes[0].AdminClient(t, ctx) + specifications.ValidatorNodeApproveSpecification(ctx, t, n0Admin, p.Nodes[1].PrivateKey(), 1, 2, true) + + time.Sleep(50 * time.Second) // wait for the node0,1 to produce a snapshot + + p.RunServices(t, ctx, []*setup.ServiceDefinition{ + setup.KwildServiceDefinition("node3"), + setup.PostgresServiceDefinition("pg3"), + }, 2*time.Minute) + + time.Sleep(4 * time.Second) + + // node3 should see the updated validator set + n3Admin := p.Nodes[3].AdminClient(t, ctx) + specifications.CurrentValidatorsSpecification(ctx, t, n3Admin, 2) + + clt3 := p.Nodes[3].JSONRPCClient(t, ctx, &setup.ClientOptions{ + PrivateKey: UserPrivkey1, + }) + specifications.ListUsersSpecification(ctx, t, clt3, false, 1) + }) + } +} + func TestOfflineMigrations(t *testing.T) { for _, driver := range setup.AllDrivers { t.Run(driver.String()+"_driver", func(t *testing.T) { diff --git a/test/specifications/validator_ops.go b/test/specifications/validator_ops.go index 068cfcdcc..134ec227d 100644 --- a/test/specifications/validator_ops.go +++ b/test/specifications/validator_ops.go @@ -44,6 +44,16 @@ func ValidatorNodeJoinSpecification(ctx context.Context, t *testing.T, netops Va assert.NoError(t, err) assert.Equal(t, valCount, len(vals)) } +func ValidatorJoinStatusSpecification(ctx context.Context, t *testing.T, netops ValidatorOpsDsl, joinerKey crypto.PrivateKey, valCount int) { + t.Log("Executing network node join status specification") + + // Get Request status, #approvals = 0, #board = valCount + joiner := joinerKey.Public().Bytes() + joinStatus, err := netops.ValidatorJoinStatus(ctx, joiner, joinerKey.Type()) + require.NoError(t, err) + assert.Equal(t, valCount, len(joinStatus.Board)) + assert.Equal(t, 0, approvalCount(joinStatus)) +} func JoinExistingValidatorSpecification(ctx context.Context, t *testing.T, netops ValidatorOpsDsl, joinerKey crypto.PrivateKey) { t.Log("Executing existing validator join specification")