Skip to content

Commit

Permalink
Separate storing the previous state of LastValidators/LastProofHash
Browse files Browse the repository at this point in the history
… for `statesync`
  • Loading branch information
tnasu committed Jul 27, 2021
1 parent f24370b commit 4672023
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 39 deletions.
9 changes: 8 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,11 +593,18 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto
}

go func() {
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
state, previousState, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("State sync failed", "err", err)
return
}
if previousState.LastBlockHeight > 0 {
err = stateStore.Bootstrap(previousState)
if err != nil {
ssR.Logger.Error("Failed to bootstrap node with previous state", "err", err)
return
}
}
err = stateStore.Bootstrap(state)
if err != nil {
ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
Expand Down
8 changes: 0 additions & 8 deletions state/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,6 @@ func (store dbStore) Bootstrap(state State) error {
height = state.InitialHeight
}

if height > 1 && !state.LastVoters.IsNilOrEmpty() {
// TODO 🏺Can apply empty bytes for the ProofHash corresponding to LastValidators? and LastVoters as LastValidators?
vals := types.NewValidatorSet(state.LastVoters.Voters)
if err := store.saveValidatorsInfo(height-1, height-1, vals); err != nil {
return err
}
}

if err := store.saveValidatorsInfo(height, height, state.Validators); err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,14 @@ func (r *Reactor) recentSnapshots(n uint32) ([]*snapshot, error) {
return snapshots, nil
}

// Sync runs a state sync, returning the new state and last commit at the snapshot height.
// Sync runs a state sync, returning the new state, previous state and last commit at the snapshot height.
// The caller must store the state and commit in the state database and block store.
func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) (sm.State, *types.Commit, error) {
func (r *Reactor) Sync(
stateProvider StateProvider, discoveryTime time.Duration) (sm.State, sm.State, *types.Commit, error) {
r.mtx.Lock()
if r.syncer != nil {
r.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
return sm.State{}, sm.State{}, nil, errors.New("a state sync is already in progress")
}
r.syncer = newSyncer(r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
r.mtx.Unlock()
Expand All @@ -264,9 +265,9 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
r.Logger.Debug("Requesting snapshots from known peers")
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))

state, commit, err := r.syncer.SyncAny(discoveryTime)
state, previousState, commit, err := r.syncer.SyncAny(discoveryTime)
r.mtx.Lock()
r.syncer = nil
r.mtx.Unlock()
return state, commit, err
return state, previousState, commit, err
}
44 changes: 27 additions & 17 deletions statesync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ func (s *syncer) RemovePeer(peer p2p.Peer) {
}

// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
// snapshots if none were found and discoveryTime > 0. It returns the latest state, previous state and block commit
// which the caller must use to bootstrap the node.
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit, error) {
func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, sm.State, *types.Commit, error) {
if discoveryTime > 0 {
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
Expand All @@ -146,7 +146,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
}
if snapshot == nil {
if discoveryTime == 0 {
return sm.State{}, nil, errNoSnapshots
return sm.State{}, sm.State{}, nil, errNoSnapshots
}
s.logger.Info(fmt.Sprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
Expand All @@ -155,18 +155,18 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
if chunks == nil {
chunks, err = newChunkQueue(snapshot, s.tempDir)
if err != nil {
return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
return sm.State{}, sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
}
defer chunks.Close() // in case we forget to close it elsewhere
}

newState, commit, err := s.Sync(snapshot, chunks)
newState, previousState, commit, err := s.Sync(snapshot, chunks)
switch {
case err == nil:
return newState, commit, nil
return newState, previousState, commit, nil

case errors.Is(err, errAbort):
return sm.State{}, nil, err
return sm.State{}, sm.State{}, nil, err

case errors.Is(err, errRetrySnapshot):
chunks.RetryAll()
Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
}

default:
return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
return sm.State{}, sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
}

// Discard snapshot and chunks for next iteration
Expand All @@ -210,13 +210,13 @@ func (s *syncer) SyncAny(discoveryTime time.Duration) (sm.State, *types.Commit,
}
}

// Sync executes a sync for a specific snapshot, returning the latest state and block commit which
// Sync executes a sync for a specific snapshot, returning the latest state, previous state and block commit which
// the caller must use to bootstrap the node.
func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, sm.State, *types.Commit, error) {
s.mtx.Lock()
if s.chunks != nil {
s.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
return sm.State{}, sm.State{}, nil, errors.New("a state sync is already in progress")
}
s.chunks = chunks
s.mtx.Unlock()
Expand All @@ -229,7 +229,7 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Offer snapshot to ABCI app.
err := s.offerSnapshot(snapshot)
if err != nil {
return sm.State{}, nil, err
return sm.State{}, sm.State{}, nil, err
}

// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context cancelled.
Expand All @@ -245,31 +245,41 @@ func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.
// Optimistically build new state, so we don't discover any light client failures at the end.
state, err := s.stateProvider.State(pctx, snapshot.Height)
if err != nil {
return sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
return sm.State{}, sm.State{}, nil, fmt.Errorf("failed to build new state: %w", err)
}
var previousState sm.State
if snapshot.Height-1 < 1 {
previousState = sm.State{}
} else {
previousState, err = s.stateProvider.State(pctx, snapshot.Height-1)
if err != nil {
return sm.State{}, sm.State{}, nil, fmt.Errorf("failed to build new prvious state: %w", err)
}
}
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
if err != nil {
return sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
return sm.State{}, sm.State{}, nil, fmt.Errorf("failed to fetch commit: %w", err)
}

// Restore snapshot
err = s.applyChunks(chunks)
if err != nil {
return sm.State{}, nil, err
return sm.State{}, sm.State{}, nil, err
}

// Verify app and update app version
appVersion, err := s.verifyApp(snapshot)
if err != nil {
return sm.State{}, nil, err
return sm.State{}, sm.State{}, nil, err
}
state.Version.Consensus.App = appVersion
previousState.Version.Consensus.App = appVersion

// Done! 🎉
s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
"hash", snapshot.Hash)

return state, commit, nil
return state, previousState, commit, nil
}

// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
Expand Down
18 changes: 11 additions & 7 deletions statesync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestSyncer_SyncAny(t *testing.T) {
LastBlockAppHash: []byte("app_hash"),
}, nil)

newState, lastCommit, err := syncer.SyncAny(0)
newState, previousState, lastCommit, err := syncer.SyncAny(0)
require.NoError(t, err)

time.Sleep(50 * time.Millisecond) // wait for peers to receive requests
Expand All @@ -201,7 +201,11 @@ func TestSyncer_SyncAny(t *testing.T) {
expectState := state
expectState.Version.Consensus.App = 9

expectPreviousState := sm.State{}
expectPreviousState.Version.Consensus.App = expectState.Version.Consensus.App

assert.Equal(t, expectState, newState)
assert.Equal(t, expectPreviousState, previousState)
assert.Equal(t, commit, lastCommit)

connSnapshot.AssertExpectations(t)
Expand All @@ -212,7 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) {

func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
syncer, _ := setupOfferSyncer(t)
_, _, err := syncer.SyncAny(0)
_, _, _, err := syncer.SyncAny(0)
assert.Equal(t, errNoSnapshots, err)
}

Expand All @@ -226,7 +230,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0)
assert.Equal(t, errAbort, err)
connSnapshot.AssertExpectations(t)
}
Expand Down Expand Up @@ -257,7 +261,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0)
assert.Equal(t, errNoSnapshots, err)
connSnapshot.AssertExpectations(t)
}
Expand All @@ -284,7 +288,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
Snapshot: toABCI(s11), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_ABORT}, nil)

_, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0)
assert.Equal(t, errAbort, err)
connSnapshot.AssertExpectations(t)
}
Expand Down Expand Up @@ -322,7 +326,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
Snapshot: toABCI(sa), AppHash: []byte("app_hash"),
}).Once().Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT}, nil)

_, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0)
assert.Equal(t, errNoSnapshots, err)
connSnapshot.AssertExpectations(t)
}
Expand All @@ -338,7 +342,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
Snapshot: toABCI(s), AppHash: []byte("app_hash"),
}).Once().Return(nil, errBoom)

_, _, err = syncer.SyncAny(0)
_, _, _, err = syncer.SyncAny(0)
assert.True(t, errors.Is(err, errBoom))
connSnapshot.AssertExpectations(t)
}
Expand Down
2 changes: 1 addition & 1 deletion test/maverick/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto
}

go func() {
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
state, _, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("State sync failed", "err", err)
return
Expand Down

0 comments on commit 4672023

Please sign in to comment.