Skip to content

Commit

Permalink
core: improve snapshot journal recovery (ethereum#21594)
Browse files Browse the repository at this point in the history
* core/state/snapshot: introduce snapshot journal version

* core: update the disk layer in an atomic way

* core: persist the disk layer generator periodically

* core/state/snapshot: improve logging

* core/state/snapshot: forcibly ensure the legacy snapshot is matched

* core/state/snapshot: add debug logs

* core, tests: fix tests and special recovery case

* core: polish

* core: add more blockchain tests for snapshot recovery

* core/state: fix comment

* core: add recovery flag for snapshot

* core: add restart after start-after-crash tests

* core/rawdb: fix imports

* core: fix tests

* core: remove log

* core/state/snapshot: fix snapshot

* core: avoid callbacks in SetHead

* core: fix setHead cornercase where the threshold root has state

* core: small docs for the test cases

Co-authored-by: Péter Szilágyi <[email protected]>
  • Loading branch information
rjl493456442 and karalabe authored Oct 29, 2020
1 parent 43c278c commit b63e3c3
Show file tree
Hide file tree
Showing 11 changed files with 1,792 additions and 159 deletions.
92 changes: 79 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
writeLegacyJournal bool // Testing flag used to flush the snapshot journal in legacy format.
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -281,9 +282,29 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash(), "snaproot", diskRoot)

snapDisk, err := bc.SetHeadBeyondRoot(head.NumberU64(), diskRoot)
if err != nil {
return nil, err
}
// Chain rewound, persist old snapshot number to indicate recovery procedure
if snapDisk != 0 {
rawdb.WriteSnapshotRecoveryNumber(bc.db, snapDisk)
}
} else {
log.Warn("Head state missing, repairing", "number", head.Number(), "hash", head.Hash())
if err := bc.SetHead(head.NumberU64()); err != nil {
return nil, err
}
}
}
// Ensure that a previous crash in SetHead doesn't leave extra ancients
Expand Down Expand Up @@ -339,7 +360,18 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root(), !bc.cacheConfig.SnapshotWait)
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool

head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer > head.NumberU64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.NumberU64(), "diskbase", *layer)
recover = true
}
bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, head.Root(), !bc.cacheConfig.SnapshotWait, recover)
}
// Take ownership of this particular state
go bc.update()
Expand Down Expand Up @@ -444,9 +476,25 @@ func (bc *BlockChain) loadLastState() error {
// was fast synced or full synced and in which state, the method will try to
// delete minimal data from disk whilst retaining chain consistency.
func (bc *BlockChain) SetHead(head uint64) error {
_, err := bc.SetHeadBeyondRoot(head, common.Hash{})
return err
}

// SetHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. This method is meant to be
// used when rewiding with snapshots enabled to ensure that we go back further than
// persistent disk layer. Depending on whether the node was fast synced or full, and
// in which state, the method will try to delete minimal data from disk whilst
// retaining chain consistency.
//
// The method returns the block number where the requested root cap was found.
func (bc *BlockChain) SetHeadBeyondRoot(head uint64, root common.Hash) (uint64, error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Track the block number of the requested root hash
var rootNumber uint64 // (no root == always 0)

// Retrieve the last pivot block to short circuit rollbacks beyond it and the
// current freezer limit to start nuking id underflown
pivot := rawdb.ReadLastPivotNumber(bc.db)
Expand All @@ -462,8 +510,16 @@ func (bc *BlockChain) SetHead(head uint64) error {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
} else {
// Block exists, keep rewinding until we find one with state
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
// root hash
beyondRoot := (root == common.Hash{}) // Flag whether we're beyond the requested root (no root, always true)

for {
// If a root threshold was requested but not yet crossed, check
if root != (common.Hash{}) && !beyondRoot && newHeadBlock.Root() == root {
beyondRoot, rootNumber = true, newHeadBlock.NumberU64()
}
if _, err := state.New(newHeadBlock.Root(), bc.stateCache, bc.snaps); err != nil {
log.Trace("Block state missing, rewinding further", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
if pivot == nil || newHeadBlock.NumberU64() > *pivot {
Expand All @@ -474,8 +530,12 @@ func (bc *BlockChain) SetHead(head uint64) error {
newHeadBlock = bc.genesisBlock
}
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
if beyondRoot || newHeadBlock.NumberU64() == 0 {
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
newHeadBlock = bc.GetBlock(newHeadBlock.ParentHash(), newHeadBlock.NumberU64()-1) // Keep rewinding
}
}
rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash())
Expand Down Expand Up @@ -555,7 +615,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
bc.txLookupCache.Purge()
bc.futureBlocks.Purge()

return bc.loadLastState()
return rootNumber, bc.loadLastState()
}

// FastSyncCommitHead sets the current head block to the one defined by the hash
Expand Down Expand Up @@ -940,8 +1000,14 @@ func (bc *BlockChain) Stop() {
var snapBase common.Hash
if bc.snaps != nil {
var err error
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
if bc.writeLegacyJournal {
if snapBase, err = bc.snaps.LegacyJournal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
} else {
if snapBase, err = bc.snaps.Journal(bc.CurrentBlock().Root()); err != nil {
log.Error("Failed to journal state snapshot", "err", err)
}
}
}
// Ensure the state of a recent block is also stored to disk before exiting.
Expand Down
Loading

0 comments on commit b63e3c3

Please sign in to comment.