From c9e6cee787982d5c7b82a26485a3539122a25c20 Mon Sep 17 00:00:00 2001 From: zjubfd <296179868@qq.com> Date: Wed, 26 Jan 2022 14:12:18 +0800 Subject: [PATCH] [R4R] implement State Verification && Snapshot Commit pipeline (#668) * pipeline commit trie add metrics reopen trie * add unit testcase * resolve keefe's comment * resolve igor's comments * update prefetch remove prefetcher * no need to return error for precacheTransaction * fix lint issue * add some comments * remove useless code * add default option is false * fix diffsync nil point * fix panic on GetProofByHash Co-authored-by: zjubfd --- cmd/evm/internal/t8ntool/execution.go | 8 +- cmd/evm/runner.go | 4 +- cmd/evm/staterunner.go | 3 +- cmd/geth/main.go | 1 + cmd/utils/flags.go | 7 + consensus/clique/clique.go | 4 + core/block_validator.go | 29 ++- core/blockchain.go | 230 ++++++++++++++-------- core/blockchain_diff_test.go | 8 +- core/blockchain_test.go | 268 ++++++++++++++++++-------- core/chain_makers.go | 2 +- core/error.go | 3 + core/genesis.go | 2 +- core/state/database.go | 3 + core/state/snapshot/difflayer.go | 36 +++- core/state/snapshot/difflayer_test.go | 36 ++-- core/state/snapshot/disklayer.go | 14 +- core/state/snapshot/disklayer_test.go | 8 +- core/state/snapshot/iterator_test.go | 90 ++++----- core/state/snapshot/journal.go | 2 +- core/state/snapshot/snapshot.go | 19 +- core/state/snapshot/snapshot_test.go | 24 +-- core/state/state_test.go | 12 +- core/state/statedb.go | 256 ++++++++++++++++++------ core/state/statedb_test.go | 35 +++- core/state/sync_test.go | 4 +- core/state/trie_prefetcher.go | 22 ++- core/state_prefetcher.go | 37 ++-- core/state_processor.go | 20 +- core/types.go | 2 +- eth/api_test.go | 6 +- eth/backend.go | 3 + eth/ethconfig/config.go | 1 + eth/state_accessor.go | 4 +- eth/tracers/api.go | 4 +- ethclient/ethclient_test.go | 1 + miner/worker.go | 5 + params/protocol_params.go | 1 - tests/state_test_util.go | 8 +- trie/database.go | 58 ++++-- 40 files changed, 877 insertions(+), 403 deletions(-) diff --git a/cmd/evm/internal/t8ntool/execution.go b/cmd/evm/internal/t8ntool/execution.go index eb8b79ad1b..9eaead3878 100644 --- a/cmd/evm/internal/t8ntool/execution.go +++ b/cmd/evm/internal/t8ntool/execution.go @@ -249,7 +249,9 @@ func (pre *Prestate) Apply(vmConfig vm.Config, chainConfig *params.ChainConfig, statedb.AddBalance(pre.Env.Coinbase, minerReward) } // Commit block - root, _, err := statedb.Commit(chainConfig.IsEIP158(vmContext.BlockNumber)) + statedb.Finalise(chainConfig.IsEIP158(vmContext.BlockNumber)) + statedb.AccountsIntermediateRoot() + root, _, err := statedb.Commit(nil) if err != nil { fmt.Fprintf(os.Stderr, "Could not commit state: %v", err) return nil, nil, NewError(ErrorEVM, fmt.Errorf("could not commit state: %v", err)) @@ -280,7 +282,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc) *state.StateDB } } // Commit and re-open to start with a clean state. - root, _, _ := statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + root, _, _ := statedb.Commit(nil) statedb, _ = state.New(root, sdb, nil) return statedb } diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index 889de43e0a..6f2d97320d 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -268,7 +268,9 @@ func runCmd(ctx *cli.Context) error { output, leftOverGas, stats, err := timedExec(bench, execFunc) if ctx.GlobalBool(DumpFlag.Name) { - statedb.Commit(true) + statedb.Finalise(true) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) statedb.IntermediateRoot(true) fmt.Println(string(statedb.Dump(nil))) } diff --git a/cmd/evm/staterunner.go b/cmd/evm/staterunner.go index 90596d9b3c..6ffc2f51ca 100644 --- a/cmd/evm/staterunner.go +++ b/cmd/evm/staterunner.go @@ -102,7 +102,8 @@ func stateTestCmd(ctx *cli.Context) error { _, s, err := test.Run(st, cfg, false) // print state root for evmlab tracing if ctx.GlobalBool(MachineFlag.Name) && s != nil { - fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", s.IntermediateRoot(false)) + root := s.IntermediateRoot(false) + fmt.Fprintf(os.Stderr, "{\"stateRoot\": \"%x\"}\n", root) } if err != nil { // Test failed, mark as so and dump any state to aid debugging diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f56e632a27..8b8df383de 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -72,6 +72,7 @@ var ( utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, utils.DiffSyncFlag, + utils.PipeCommitFlag, utils.RangeLimitFlag, utils.USBFlag, utils.SmartCardDaemonPathFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cf5b80853f..3854d98f05 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -128,6 +128,10 @@ var ( Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " + "but will degrade the security to light client level", } + PipeCommitFlag = cli.BoolFlag{ + Name: "pipecommit", + Usage: "Enable MPT pipeline commit, it will improve syncing performance. It is an experimental feature(default is false)", + } RangeLimitFlag = cli.BoolFlag{ Name: "rangelimit", Usage: "Enable 5000 blocks limit for range query", @@ -1634,6 +1638,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DiffSyncFlag.Name) { cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name) } + if ctx.GlobalIsSet(PipeCommitFlag.Name) { + cfg.PipeCommit = ctx.GlobalBool(PipeCommitFlag.Name) + } if ctx.GlobalIsSet(RangeLimitFlag.Name) { cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name) } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index c8629136b2..0620b91a1b 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -575,7 +575,11 @@ func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Heade func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, []*types.Receipt, error) { // No block rewards in PoA, so the state remains as is and uncles are dropped + var err error header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) + if err != nil { + return nil, nil, err + } header.UncleHash = types.CalcUncleHash(nil) // Assemble and return the final block for sealing diff --git a/core/block_validator.go b/core/block_validator.go index ea5940d2dc..c944ac2d05 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -18,6 +18,7 @@ package core import ( "fmt" + "time" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core/state" @@ -26,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +const badBlockCacheExpire = 30 * time.Second + // BlockValidator is responsible for validating block headers, uncles and // processed state. // @@ -54,6 +57,9 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { return ErrKnownBlock } + if v.bc.isCachedBadBlock(block) { + return ErrKnownBadBlock + } // Header validity is known at this point, check the uncles and transactions header := block.Header() if err := v.engine.VerifyUncles(v.bc, block); err != nil { @@ -106,7 +112,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // transition, such as amount of used gas, the receipt roots and the state root // itself. ValidateState returns a database batch if the validation was a success // otherwise nil and an error is returned. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64) error { +func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error { header := block.Header() if block.GasUsed() != usedGas { return fmt.Errorf("invalid gas used (remote: %d local: %d)", block.GasUsed(), usedGas) @@ -125,17 +131,26 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD receiptSha := types.DeriveSha(receipts, trie.NewStackTrie(nil)) if receiptSha != header.ReceiptHash { return fmt.Errorf("invalid receipt root hash (remote: %x local: %x)", header.ReceiptHash, receiptSha) - } else { - return nil } + return nil }, - func() error { + } + if skipHeavyVerify { + validateFuns = append(validateFuns, func() error { + if err := statedb.WaitPipeVerification(); err != nil { + return err + } + statedb.Finalise(v.config.IsEIP158(header.Number)) + statedb.AccountsIntermediateRoot() + return nil + }) + } else { + validateFuns = append(validateFuns, func() error { if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { return fmt.Errorf("invalid merkle root (remote: %x local: %x)", header.Root, root) - } else { - return nil } - }, + return nil + }) } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { diff --git a/core/blockchain.go b/core/blockchain.go index 7a50b14694..7825efca47 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -77,8 +77,9 @@ var ( blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil) - errInsertionInterrupted = errors.New("insertion is interrupted") - errChainStopped = errors.New("blockchain is stopped") + errInsertionInterrupted = errors.New("insertion is interrupted") + errStateRootVerificationFailed = errors.New("state root verification failed") + errChainStopped = errors.New("blockchain is stopped") ) const ( @@ -88,6 +89,7 @@ const ( diffLayerRLPCacheLimit = 256 receiptsCacheLimit = 10000 txLookupCacheLimit = 1024 + maxBadBlockLimit = 16 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 maxBeyondBlocks = 2048 @@ -100,6 +102,8 @@ const ( maxDiffForkDist = 11 // Maximum allowed backward distance from the chain head maxDiffLimitForBroadcast = 128 // Maximum number of unique diff layers a peer may have broadcasted + rewindBadBlockInterval = 1 * time.Second + // BlockChainVersion ensures that an incompatible database forces a resync from scratch. // // Changelog: @@ -178,10 +182,11 @@ type BlockChain struct { chainConfig *params.ChainConfig // Chain & network configuration cacheConfig *CacheConfig // Cache configuration for pruning - db ethdb.Database // Low level persistent database to store final content in - snaps *snapshot.Tree // Snapshot tree for fast trie leaf access - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping + db ethdb.Database // Low level persistent database to store final content in + snaps *snapshot.Tree // Snapshot tree for fast trie leaf access + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping + commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -216,6 +221,7 @@ type BlockChain struct { blockCache *lru.Cache // Cache for the most recent entire blocks txLookupCache *lru.Cache // Cache for the most recent transaction lookup data. futureBlocks *lru.Cache // future blocks are blocks added for later processing + badBlockCache *lru.Cache // Cache for the blocks that failed to pass MPT root verification // trusted diff layers diffLayerCache *lru.Cache // Cache for the diffLayers @@ -243,6 +249,10 @@ type BlockChain struct { processor Processor // Block transaction processor interface forker *ForkChoice vmConfig vm.Config + pipeCommit bool + + shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. + terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion. } // NewBlockChain returns a fully initialised block chain using information @@ -263,6 +273,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par receiptsCache, _ := lru.New(receiptsCacheLimit) blockCache, _ := lru.New(blockCacheLimit) txLookupCache, _ := lru.New(txLookupCacheLimit) + badBlockCache, _ := lru.New(maxBadBlockLimit) + futureBlocks, _ := lru.New(maxFutureBlocks) diffLayerCache, _ := lru.New(diffLayerCacheLimit) diffLayerRLPCache, _ := lru.New(diffLayerRLPCacheLimit) @@ -284,6 +296,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par bodyRLPCache: bodyRLPCache, receiptsCache: receiptsCache, blockCache: blockCache, + badBlockCache: badBlockCache, diffLayerCache: diffLayerCache, diffLayerRLPCache: diffLayerRLPCache, txLookupCache: txLookupCache, @@ -466,7 +479,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par go bc.trustedDiffLayerLoop() } go bc.untrustedDiffLayerPruneLoop() - + if bc.pipeCommit { + // check current block and rewind invalid one + go bc.rewindInvalidHeaderBlockLoop() + } return bc, nil } @@ -578,24 +594,36 @@ func (bc *BlockChain) loadLastState() error { // was fast synced or full synced and in which state, the method will try to // delete minimal data from disk whilst retaining chain consistency. func (bc *BlockChain) SetHead(head uint64) error { + if !bc.chainmu.TryLock() { + return nil + } + defer bc.chainmu.Unlock() _, err := bc.setHeadBeyondRoot(head, common.Hash{}, false) return err } -// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition -// that the rewind must pass the specified state root. This method is meant to be -// used when rewinding with snapshots enabled to ensure that we go back further than -// persistent disk layer. Depending on whether the node was fast synced or full, and -// in which state, the method will try to delete minimal data from disk whilst -// retaining chain consistency. -// -// The method returns the block number where the requested root cap was found. -func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bool) (uint64, error) { +func (bc *BlockChain) tryRewindBadBlocks() { if !bc.chainmu.TryLock() { - return 0, errChainStopped + return } defer bc.chainmu.Unlock() + block := bc.CurrentBlock() + snaps := bc.snaps + // Verified and Result is false + if snaps != nil && snaps.Snapshot(block.Root()) != nil && + snaps.Snapshot(block.Root()).Verified() && !snaps.Snapshot(block.Root()).WaitAndGetVerifyRes() { + // Rewind by one block + log.Warn("current block verified failed, rewind to its parent", "height", block.NumberU64(), "hash", block.Hash()) + bc.futureBlocks.Remove(block.Hash()) + bc.badBlockCache.Add(block.Hash(), time.Now()) + bc.diffLayerCache.Remove(block.Hash()) + bc.diffLayerRLPCache.Remove(block.Hash()) + bc.reportBlock(block, nil, errStateRootVerificationFailed) + bc.setHeadBeyondRoot(block.NumberU64()-1, common.Hash{}, false) + } +} +func (bc *BlockChain) setHeadBeyondRoot(head uint64, root common.Hash, repair bool) (uint64, error) { // Track the block number of the requested root hash var rootNumber uint64 // (no root == always 0) @@ -1390,8 +1418,77 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } wg.Done() }() + + tryCommitTrieDB := func() error { + bc.commitLock.Lock() + defer bc.commitLock.Unlock() + + triedb := bc.stateCache.TrieDB() + // If we're running an archive node, always flush + if bc.cacheConfig.TrieDirtyDisabled { + err := triedb.Commit(block.Root(), false, nil) + if err != nil { + return err + } + } else { + // Full but not archive node, do proper garbage collection + triedb.Reference(block.Root(), common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(block.Root(), -int64(block.NumberU64())) + + if current := block.NumberU64(); current > bc.triesInMemory { + // If we exceeded our memory allowance, flush matured singleton nodes to disk + var ( + nodes, imgs = triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 + ) + if nodes > limit || imgs > 4*1024*1024 { + triedb.Cap(limit - ethdb.IdealBatchSize) + } + // Find the next state trie we need to commit + chosen := current - bc.triesInMemory + + // If we exceeded out time allowance, flush an entire trie to disk + if bc.gcproc > bc.cacheConfig.TrieTimeLimit { + canWrite := true + if posa, ok := bc.engine.(consensus.PoSA); ok { + if !posa.EnoughDistance(bc, block.Header()) { + canWrite = false + } + } + if canWrite { + // If the header is missing (canonical chain behind), we're reorging a low + // diff sidechain. Suspend committing until this operation is completed. + header := bc.GetHeaderByNumber(chosen) + if header == nil { + log.Warn("Reorg in progress, trie commit postponed", "number", chosen) + } else { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory)) + } + // Flush an entire trie and restart the counters + triedb.Commit(header.Root, true, nil) + lastWrite = chosen + bc.gcproc = 0 + } + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + go triedb.Dereference(root.(common.Hash)) + } + } + } + return nil + } // Commit all cached state changes into underlying memory database. - root, diffLayer, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + _, diffLayer, err := state.Commit(bc.tryRewindBadBlocks, tryCommitTrieDB) if err != nil { return err } @@ -1404,66 +1501,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. diffLayer.Number = block.NumberU64() bc.cacheDiffLayer(diffLayer) } - triedb := bc.stateCache.TrieDB() - - // If we're running an archive node, always flush - if bc.cacheConfig.TrieDirtyDisabled { - return triedb.Commit(root, false, nil) - } else { - // Full but not archive node, do proper garbage collection - triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive - bc.triegc.Push(root, -int64(block.NumberU64())) - - if current := block.NumberU64(); current > bc.triesInMemory { - // If we exceeded our memory allowance, flush matured singleton nodes to disk - var ( - nodes, imgs = triedb.Size() - limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 - ) - if nodes > limit || imgs > 4*1024*1024 { - triedb.Cap(limit - ethdb.IdealBatchSize) - } - // Find the next state trie we need to commit - chosen := current - bc.triesInMemory - - // If we exceeded out time allowance, flush an entire trie to disk - if bc.gcproc > bc.cacheConfig.TrieTimeLimit { - canWrite := true - if posa, ok := bc.engine.(consensus.PoSA); ok { - if !posa.EnoughDistance(bc, block.Header()) { - canWrite = false - } - } - if canWrite { - // If the header is missing (canonical chain behind), we're reorging a low - // diff sidechain. Suspend committing until this operation is completed. - header := bc.GetHeaderByNumber(chosen) - if header == nil { - log.Warn("Reorg in progress, trie commit postponed", "number", chosen) - } else { - // If we're exceeding limits but haven't reached a large enough memory gap, - // warn the user that the system is becoming unstable. - if chosen < lastWrite+bc.triesInMemory && bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { - log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/float64(bc.triesInMemory)) - } - // Flush an entire trie and restart the counters - triedb.Commit(header.Root, true, nil) - lastWrite = chosen - bc.gcproc = 0 - } - } - } - // Garbage collect anything below our required write retention - for !bc.triegc.Empty() { - root, number := bc.triegc.Pop() - if uint64(-number) > chosen { - bc.triegc.Push(root, number) - break - } - go triedb.Dereference(root.(common.Hash)) - } - } - } + wg.Wait() return nil } @@ -1795,6 +1833,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) } //Process block using the parent state as reference point substart := time.Now() + if bc.pipeCommit { + statedb.EnablePipeCommit() + } + statedb.SetExpectedStateRoot(block.Root()) statedb, receipts, logs, usedGas, err := bc.processor.Process(block, statedb, bc.vmConfig) atomic.StoreUint32(&followupInterrupt, 1) activeState = statedb @@ -1815,7 +1857,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) // Validate the state using the default validator substart = time.Now() if !statedb.IsLightProcessed() { - if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas, bc.pipeCommit); err != nil { log.Error("validate state failed", "error", err) bc.reportBlock(block, receipts, err) return it.index, err @@ -2354,6 +2396,19 @@ func (bc *BlockChain) updateFutureBlocks() { } } +func (bc *BlockChain) rewindInvalidHeaderBlockLoop() { + recheck := time.NewTicker(rewindBadBlockInterval) + defer recheck.Stop() + for { + select { + case <-recheck.C: + bc.tryRewindBadBlocks() + case <-bc.quit: + return + } + } +} + func (bc *BlockChain) trustedDiffLayerLoop() { recheck := time.NewTicker(diffLayerFreezerRecheckInterval) bc.wg.Add(1) @@ -2740,6 +2795,18 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) { } } +func (bc *BlockChain) isCachedBadBlock(block *types.Block) bool { + if timeAt, exist := bc.badBlockCache.Get(block.Hash()); exist { + putAt := timeAt.(time.Time) + if time.Since(putAt) >= badBlockCacheExpire { + bc.badBlockCache.Remove(block.Hash()) + return false + } + return true + } + return false +} + // reportBlock logs a bad block error. func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) { rawdb.WriteBadBlock(bc.db, block) @@ -2793,6 +2860,11 @@ func EnableLightProcessor(bc *BlockChain) *BlockChain { return bc } +func EnablePipelineCommit(bc *BlockChain) *BlockChain { + bc.pipeCommit = true + return bc +} + func EnablePersistDiff(limit uint64) BlockChainOption { return func(chain *BlockChain) *BlockChain { chain.diffLayerFreezerBlockLimit = limit diff --git a/core/blockchain_diff_test.go b/core/blockchain_diff_test.go index 451a966589..2575843a92 100644 --- a/core/blockchain_diff_test.go +++ b/core/blockchain_diff_test.go @@ -317,6 +317,9 @@ func TestProcessDiffLayer(t *testing.T) { lightBackend.Chain().HandleDiffLayer(diff, "testpid", true) } _, err := lightBackend.chain.insertChain([]*types.Block{block}, true) + if err != nil { + t.Errorf("failed to insert block %v", err) + } if checks, exist := checkBlocks[i]; exist { for _, check := range checks.txs { s, _ := lightBackend.Chain().Snapshots().Snapshot(block.Root()).Storage(crypto.Keccak256Hash((*check.to)[:]), check.slot) @@ -325,9 +328,6 @@ func TestProcessDiffLayer(t *testing.T) { } } } - if err != nil { - t.Errorf("failed to insert block %v", err) - } } currentBlock := lightBackend.chain.CurrentBlock() nextBlock := fullBackend.chain.GetBlockByNumber(currentBlock.NumberU64() + 1) @@ -368,7 +368,7 @@ func TestFreezeDiffLayer(t *testing.T) { // Wait for the buffer to be zero. } // Minus one empty block. - if fullBackend.chain.diffQueue.Size() != blockNum-1 { + if fullBackend.chain.diffQueue.Size() > blockNum-1 && fullBackend.chain.diffQueue.Size() < blockNum-2 { t.Errorf("size of diff queue is wrong, expected: %d, get: %d", blockNum-1, fullBackend.chain.diffQueue.Size()) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 3c4357a861..4f11a44b5f 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -46,7 +46,8 @@ import ( // So we can deterministically seed different blockchains var ( canonicalSeed = 1 - forkSeed = 2 + forkSeed1 = 2 + forkSeed2 = 3 TestTriesInMemory = 128 ) @@ -54,14 +55,18 @@ var ( // newCanonical creates a chain database, and injects a deterministic canonical // chain. Depending on the full flag, if creates either a full block chain or a // header only chain. -func newCanonical(engine consensus.Engine, n int, full bool) (ethdb.Database, *BlockChain, error) { +func newCanonical(engine consensus.Engine, n int, full, pipeline bool) (ethdb.Database, *BlockChain, error) { var ( db = rawdb.NewMemoryDatabase() genesis = (&Genesis{BaseFee: big.NewInt(params.InitialBaseFee)}).MustCommit(db) ) // Initialize a fresh chain with only a genesis block - blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil) + var ops []BlockChainOption + if pipeline { + ops = append(ops, EnablePipelineCommit) + } + blockchain, _ := NewBlockChain(db, nil, params.AllEthashProtocolChanges, engine, vm.Config{}, nil, nil, ops...) // Create and inject the requested chain if n == 0 { return db, blockchain, nil @@ -83,9 +88,53 @@ func newGwei(n int64) *big.Int { } // Test fork of length N starting from block i -func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, comparator func(td1, td2 *big.Int)) { +func testInvalidStateRootBlockImport(t *testing.T, blockchain *BlockChain, i, n int, pipeline bool) { // Copy old chain up to #i into a new db - db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full) + db, blockchain2, err := newCanonical(ethash.NewFaker(), i, true, pipeline) + if err != nil { + t.Fatal("could not make new canonical in testFork", err) + } + defer blockchain2.Stop() + + // Assert the chains have the same header/block at #i + hash1 := blockchain.GetBlockByNumber(uint64(i)).Hash() + hash2 := blockchain2.GetBlockByNumber(uint64(i)).Hash() + if hash1 != hash2 { + t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", i, hash2, hash1) + } + // Extend the newly created chain + blockChainB := makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1) + for idx, block := range blockChainB { + block.SetRoot(common.Hash{0: byte(forkSeed1), 19: byte(idx)}) + } + previousBlock := blockchain.CurrentBlock() + // Sanity check that the forked chain can be imported into the original + if _, err := blockchain.InsertChain(blockChainB); err == nil { + t.Fatalf("failed to report insert error") + } + + time.Sleep(2 * rewindBadBlockInterval) + latestBlock := blockchain.CurrentBlock() + if latestBlock.Hash() != previousBlock.Hash() || latestBlock.NumberU64() != previousBlock.NumberU64() { + t.Fatalf("rewind do not take effect") + } + db, blockchain3, err := newCanonical(ethash.NewFaker(), i, true, pipeline) + if err != nil { + t.Fatal("could not make new canonical in testFork", err) + } + defer blockchain3.Stop() + + blockChainC := makeBlockChain(blockchain3.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed2) + + if _, err := blockchain.InsertChain(blockChainC); err != nil { + t.Fatalf("failed to insert forking chain: %v", err) + } +} + +// Test fork of length N starting from block i +func testFork(t *testing.T, blockchain *BlockChain, i, n int, full, pipeline bool, comparator func(td1, td2 *big.Int)) { + // Copy old chain up to #i into a new db + db, blockchain2, err := newCanonical(ethash.NewFaker(), i, full, pipeline) if err != nil { t.Fatal("could not make new canonical in testFork", err) } @@ -109,12 +158,12 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara headerChainB []*types.Header ) if full { - blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed) + blockChainB = makeBlockChain(blockchain2.CurrentBlock(), n, ethash.NewFaker(), db, forkSeed1) if _, err := blockchain2.InsertChain(blockChainB); err != nil { t.Fatalf("failed to insert forking chain: %v", err) } } else { - headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed) + headerChainB = makeHeaderChain(blockchain2.CurrentHeader(), n, ethash.NewFaker(), db, forkSeed1) if _, err := blockchain2.InsertHeaderChain(headerChainB, 1); err != nil { t.Fatalf("failed to insert forking chain: %v", err) } @@ -145,7 +194,7 @@ func testFork(t *testing.T, blockchain *BlockChain, i, n int, full bool, compara // testBlockChainImport tries to process a chain of blocks, writing them into // the database if successful. -func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { +func testBlockChainImport(chain types.Blocks, pipelineCommit bool, blockchain *BlockChain) error { for _, block := range chain { // Try and process the block err := blockchain.engine.VerifyHeader(blockchain, block.Header(), true) @@ -162,12 +211,16 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { if err != nil { return err } + statedb.SetExpectedStateRoot(block.Root()) + if pipelineCommit { + statedb.EnablePipeCommit() + } statedb, receipts, _, usedGas, err := blockchain.processor.Process(block, statedb, vm.Config{}) if err != nil { blockchain.reportBlock(block, receipts, err) return err } - err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas) + err = blockchain.validator.ValidateState(block, statedb, receipts, usedGas, pipelineCommit) if err != nil { blockchain.reportBlock(block, receipts, err) return err @@ -176,7 +229,9 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.chainmu.MustLock() rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))) rawdb.WriteBlock(blockchain.db, block) - statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) blockchain.chainmu.Unlock() } return nil @@ -199,8 +254,22 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return nil } +func TestBlockImportVerification(t *testing.T) { + length := 5 + + // Make first chain starting from genesis + _, processor, err := newCanonical(ethash.NewFaker(), length, true, true) + if err != nil { + t.Fatalf("failed to make new canonical chain: %v", err) + } + defer processor.Stop() + // Start fork from current height + processor = EnablePipelineCommit(processor) + testInvalidStateRootBlockImport(t, processor, length, 10, true) +} + func TestLastBlock(t *testing.T) { - _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true) + _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -266,14 +335,20 @@ func testInsertAfterMerge(t *testing.T, blockchain *BlockChain, i, n int, full b // Tests that given a starting canonical chain of a given size, it can be extended // with various length chains. -func TestExtendCanonicalHeaders(t *testing.T) { testExtendCanonical(t, false) } -func TestExtendCanonicalBlocks(t *testing.T) { testExtendCanonical(t, true) } +func TestExtendCanonicalHeaders(t *testing.T) { + testExtendCanonical(t, false, false) + +} +func TestExtendCanonicalBlocks(t *testing.T) { + testExtendCanonical(t, true, false) + testExtendCanonical(t, true, true) +} -func testExtendCanonical(t *testing.T, full bool) { +func testExtendCanonical(t *testing.T, full, pipeline bool) { length := 5 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -286,10 +361,10 @@ func testExtendCanonical(t *testing.T, full bool) { } } // Start fork from current height - testFork(t, processor, length, 1, full, better) - testFork(t, processor, length, 2, full, better) - testFork(t, processor, length, 5, full, better) - testFork(t, processor, length, 10, full, better) + testFork(t, processor, length, 1, full, pipeline, better) + testFork(t, processor, length, 2, full, pipeline, better) + testFork(t, processor, length, 5, full, pipeline, better) + testFork(t, processor, length, 10, full, pipeline, better) } // Tests that given a starting canonical chain of a given size, it can be extended @@ -313,14 +388,17 @@ func testExtendCanonicalAfterMerge(t *testing.T, full bool) { // Tests that given a starting canonical chain of a given size, creating shorter // forks do not take canonical ownership. -func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false) } -func TestShorterForkBlocks(t *testing.T) { testShorterFork(t, true) } +func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false, false) } +func TestShorterForkBlocks(t *testing.T) { + testShorterFork(t, true, false) + testShorterFork(t, true, true) +} -func testShorterFork(t *testing.T, full bool) { +func testShorterFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -333,12 +411,12 @@ func testShorterFork(t *testing.T, full bool) { } } // Sum of numbers must be less than `length` for this to be a shorter fork - testFork(t, processor, 0, 3, full, worse) - testFork(t, processor, 0, 7, full, worse) - testFork(t, processor, 1, 1, full, worse) - testFork(t, processor, 1, 7, full, worse) - testFork(t, processor, 5, 3, full, worse) - testFork(t, processor, 5, 4, full, worse) + testFork(t, processor, 0, 3, full, pipeline, worse) + testFork(t, processor, 0, 7, full, pipeline, worse) + testFork(t, processor, 1, 1, full, pipeline, worse) + testFork(t, processor, 1, 7, full, pipeline, worse) + testFork(t, processor, 5, 3, full, pipeline, worse) + testFork(t, processor, 5, 4, full, pipeline, worse) } // Tests that given a starting canonical chain of a given size, creating shorter @@ -366,14 +444,20 @@ func testShorterForkAfterMerge(t *testing.T, full bool) { // Tests that given a starting canonical chain of a given size, creating longer // forks do take canonical ownership. -func TestLongerForkHeaders(t *testing.T) { testLongerFork(t, false) } -func TestLongerForkBlocks(t *testing.T) { testLongerFork(t, true) } +func TestLongerForkHeaders(t *testing.T) { + testLongerFork(t, false, false) +} +func TestLongerForkBlocks(t *testing.T) { + testLongerFork(t, true, false) + testLongerFork(t, true, true) -func testLongerFork(t *testing.T, full bool) { +} + +func testLongerFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -392,7 +476,7 @@ func testLongerFork(t *testing.T, full bool) { func TestLongerForkHeadersAfterMerge(t *testing.T) { testLongerForkAfterMerge(t, false) } func TestLongerForkBlocksAfterMerge(t *testing.T) { testLongerForkAfterMerge(t, true) } -func testLongerForkAfterMerge(t *testing.T, full bool) { +func testLongerForkAfterMerge(t *testing.T, full bool, pipeline,) { length := 10 // Make first chain starting from genesis @@ -402,24 +486,28 @@ func testLongerForkAfterMerge(t *testing.T, full bool) { } defer processor.Stop() - testInsertAfterMerge(t, processor, 0, 11, full) - testInsertAfterMerge(t, processor, 0, 15, full) - testInsertAfterMerge(t, processor, 1, 10, full) - testInsertAfterMerge(t, processor, 1, 12, full) - testInsertAfterMerge(t, processor, 5, 6, full) - testInsertAfterMerge(t, processor, 5, 8, full) + testInsertAfterMerge(t, processor, 0, 11, pipeline,full) + testInsertAfterMerge(t, processor, 0, 15, pipeline, full) + testInsertAfterMerge(t, processor, 1, 10, pipeline, full) + testInsertAfterMerge(t, processor, 1, 12, pipeline, full) + testInsertAfterMerge(t, processor, 5, 6,pipeline, full) + testInsertAfterMerge(t, processor, 5, 8, pipeline,full) } // Tests that given a starting canonical chain of a given size, creating equal // forks do take canonical ownership. -func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false) } -func TestEqualForkBlocks(t *testing.T) { testEqualFork(t, true) } +func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false, false) } +func TestEqualForkBlocks(t *testing.T) { + testEqualFork(t, true, true) + testEqualFork(t, true, false) + +} -func testEqualFork(t *testing.T, full bool) { +func testEqualFork(t *testing.T, full, pipeline bool) { length := 10 // Make first chain starting from genesis - _, processor, err := newCanonical(ethash.NewFaker(), length, full) + _, processor, err := newCanonical(ethash.NewFaker(), length, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -432,12 +520,12 @@ func testEqualFork(t *testing.T, full bool) { } } // Sum of numbers must be equal to `length` for this to be an equal fork - testFork(t, processor, 0, 10, full, equal) - testFork(t, processor, 1, 9, full, equal) - testFork(t, processor, 2, 8, full, equal) - testFork(t, processor, 5, 5, full, equal) - testFork(t, processor, 6, 4, full, equal) - testFork(t, processor, 9, 1, full, equal) + testFork(t, processor, 0, 10, full, pipeline, equal) + testFork(t, processor, 1, 9, full, pipeline, equal) + testFork(t, processor, 2, 8, full, pipeline, equal) + testFork(t, processor, 5, 5, full, pipeline, equal) + testFork(t, processor, 6, 4, full, pipeline, equal) + testFork(t, processor, 9, 1, full, pipeline, equal) } // Tests that given a starting canonical chain of a given size, creating equal @@ -464,12 +552,15 @@ func testEqualForkAfterMerge(t *testing.T, full bool) { } // Tests that chains missing links do not get accepted by the processor. -func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false) } -func TestBrokenBlockChain(t *testing.T) { testBrokenChain(t, true) } +func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false, false) } +func TestBrokenBlockChain(t *testing.T) { + testBrokenChain(t, true, false) + testBrokenChain(t, true, true) +} -func testBrokenChain(t *testing.T, full bool) { +func testBrokenChain(t *testing.T, full, pipeline bool) { // Make chain starting from genesis - db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 10, full, pipeline) if err != nil { t.Fatalf("failed to make new canonical chain: %v", err) } @@ -477,12 +568,12 @@ func testBrokenChain(t *testing.T, full bool) { // Create a forked chain, and try to insert with a missing link if full { - chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed)[1:] - if err := testBlockChainImport(chain, blockchain); err == nil { + chain := makeBlockChain(blockchain.CurrentBlock(), 5, ethash.NewFaker(), db, forkSeed1)[1:] + if err := testBlockChainImport(chain, pipeline, blockchain); err == nil { t.Errorf("broken block chain not reported") } } else { - chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed)[1:] + chain := makeHeaderChain(blockchain.CurrentHeader(), 5, ethash.NewFaker(), db, forkSeed1)[1:] if err := testHeaderChainImport(chain, blockchain); err == nil { t.Errorf("broken header chain not reported") } @@ -491,19 +582,25 @@ func testBrokenChain(t *testing.T, full bool) { // Tests that reorganising a long difficult chain after a short easy one // overwrites the canonical numbers and links in the database. -func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false) } -func TestReorgLongBlocks(t *testing.T) { testReorgLong(t, true) } +func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false, false) } +func TestReorgLongBlocks(t *testing.T) { + testReorgLong(t, true, false) + testReorgLong(t, true, true) +} -func testReorgLong(t *testing.T, full bool) { - testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280+params.GenesisDifficulty.Int64(), full) +func testReorgLong(t *testing.T, full, pipeline bool) { + testReorg(t, []int64{0, 0, -9}, []int64{0, 0, 0, -9}, 393280+params.GenesisDifficulty.Int64(), full, pipeline) } // Tests that reorganising a short difficult chain after a long easy one // overwrites the canonical numbers and links in the database. -func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false) } -func TestReorgShortBlocks(t *testing.T) { testReorgShort(t, true) } +func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false, false) } +func TestReorgShortBlocks(t *testing.T) { + testReorgShort(t, true, false) + testReorgShort(t, true, true) +} -func testReorgShort(t *testing.T, full bool) { +func testReorgShort(t *testing.T, full, pipeline bool) { // Create a long easy chain vs. a short heavy one. Due to difficulty adjustment // we need a fairly long chain of blocks with different difficulties for a short // one to become heavyer than a long one. The 96 is an empirical value. @@ -515,12 +612,12 @@ func testReorgShort(t *testing.T, full bool) { for i := 0; i < len(diff); i++ { diff[i] = -9 } - testReorg(t, easy, diff, 12615120+params.GenesisDifficulty.Int64(), full) + testReorg(t, easy, diff, 12615120+params.GenesisDifficulty.Int64(), full,pipeline) } -func testReorg(t *testing.T, first, second []int64, td int64, full bool) { +func testReorg(t *testing.T, first, second []int64, td int64, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -588,12 +685,16 @@ func testReorg(t *testing.T, first, second []int64, td int64, full bool) { } // Tests that the insertion functions detect banned hashes. -func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false) } -func TestBadBlockHashes(t *testing.T) { testBadHashes(t, true) } +func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false, false) } +func TestBadBlockHashes(t *testing.T) { + testBadHashes(t, true, true) + testBadHashes(t, true, false) + +} -func testBadHashes(t *testing.T, full bool) { +func testBadHashes(t *testing.T, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -622,12 +723,16 @@ func testBadHashes(t *testing.T, full bool) { // Tests that bad hashes are detected on boot, and the chain rolled back to a // good state prior to the bad hash. -func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false) } -func TestReorgBadBlockHashes(t *testing.T) { testReorgBadHashes(t, true) } +func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false, false) } +func TestReorgBadBlockHashes(t *testing.T) { + testReorgBadHashes(t, true, false) + testReorgBadHashes(t, true, true) + +} -func testReorgBadHashes(t *testing.T, full bool) { +func testReorgBadHashes(t *testing.T, full, pipeline bool) { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -677,13 +782,16 @@ func testReorgBadHashes(t *testing.T, full bool) { } // Tests chain insertions in the face of one entity containing an invalid nonce. -func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false) } -func TestBlocksInsertNonceError(t *testing.T) { testInsertNonceError(t, true) } +func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false, false) } +func TestBlocksInsertNonceError(t *testing.T) { + testInsertNonceError(t, true, false) + testInsertNonceError(t, true, true) +} -func testInsertNonceError(t *testing.T, full bool) { +func testInsertNonceError(t *testing.T, full, pipeline bool) { for i := 1; i < 25 && !t.Failed(); i++ { // Create a pristine chain and database - db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full) + db, blockchain, err := newCanonical(ethash.NewFaker(), 0, full, pipeline) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } @@ -1379,7 +1487,7 @@ done: // Tests if the canonical block can be fetched from the database during chain insertion. func TestCanonicalBlockRetrieval(t *testing.T) { - _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true) + _, blockchain, err := newCanonical(ethash.NewFaker(), 0, true, false) if err != nil { t.Fatalf("failed to create pristine chain: %v", err) } diff --git a/core/chain_makers.go b/core/chain_makers.go index 3f3a101e43..b641b8abd0 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -262,7 +262,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse block, _, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts) // Write state changes to db - root, _, err := statedb.Commit(config.IsEIP158(b.header.Number)) + root, _, err := statedb.Commit(nil) if err != nil { panic(fmt.Sprintf("state write error: %v", err)) } diff --git a/core/error.go b/core/error.go index 46a270cd78..0cc61b7a55 100644 --- a/core/error.go +++ b/core/error.go @@ -36,6 +36,9 @@ var ( // ErrDiffLayerNotFound is returned when diff layer not found. ErrDiffLayerNotFound = errors.New("diff layer not found") + + // ErrKnownBadBlock is return when the block is a known bad block + ErrKnownBadBlock = errors.New("already known bad block") ) // List of evm-call-message pre-checking errors. All state transition messages will diff --git a/core/genesis.go b/core/genesis.go index 68cb3bc7cb..151e8c4671 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -314,7 +314,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { head.BaseFee = new(big.Int).SetUint64(params.InitialBaseFee) } } - statedb.Commit(false) + statedb.Commit(nil) statedb.Database().TrieDB().Commit(root, true, nil) return types.NewBlock(head, nil, nil, nil, trie.NewStackTrie(nil)) diff --git a/core/state/database.go b/core/state/database.go index 936f4aa288..350f513948 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -261,6 +261,9 @@ func (db *cachingDB) Purge() { // CopyTrie returns an independent copy of the given trie. func (db *cachingDB) CopyTrie(t Trie) Trie { + if t == nil { + return nil + } switch t := t.(type) { case *trie.SecureTrie: return t.Copy() diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index c0f0dab568..65b2729d9c 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,6 +118,9 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. + diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer lock sync.RWMutex @@ -168,7 +171,7 @@ func (h storageBloomHasher) Sum64() uint64 { // newDiffLayer creates a new diff on top of an existing snapshot, whether that's a low // level persistent database or a hierarchical diff already. -func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { +func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { // Create the new layer with some pre-allocated data segments dl := &diffLayer{ parent: parent, @@ -177,6 +180,7 @@ func newDiffLayer(parent snapshot, root common.Hash, destructs map[common.Hash]s accountData: accounts, storageData: storage, storageList: make(map[common.Hash][]common.Hash), + verifiedCh: verified, } switch parent := parent.(type) { case *diskLayer: @@ -256,6 +260,32 @@ func (dl *diffLayer) Root() common.Hash { return dl.root } +// WaitAndGetVerifyRes will wait until the diff layer been verified and return the verification result +func (dl *diffLayer) WaitAndGetVerifyRes() bool { + if dl.verifiedCh == nil { + return true + } + <-dl.verifiedCh + return dl.valid +} + +func (dl *diffLayer) MarkValid() { + dl.valid = true +} + +// Represent whether the difflayer is been verified, does not means it is a valid or invalid difflayer +func (dl *diffLayer) Verified() bool { + if dl.verifiedCh == nil { + return true + } + select { + case <-dl.verifiedCh: + return true + default: + return false + } +} + // Parent returns the subsequent layer of a diff layer. func (dl *diffLayer) Parent() snapshot { return dl.parent @@ -423,8 +453,8 @@ func (dl *diffLayer) storage(accountHash, storageHash common.Hash, depth int) ([ // Update creates a new layer on top of the existing snapshot diff tree with // the specified data items. -func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { - return newDiffLayer(dl, blockRoot, destructs, accounts, storage) +func (dl *diffLayer) Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { + return newDiffLayer(dl, blockRoot, destructs, accounts, storage, verified) } // flatten pushes all data from this point downwards, flattening everything into diff --git a/core/state/snapshot/difflayer_test.go b/core/state/snapshot/difflayer_test.go index e15c1d5049..a186324745 100644 --- a/core/state/snapshot/difflayer_test.go +++ b/core/state/snapshot/difflayer_test.go @@ -79,11 +79,11 @@ func TestMergeBasics(t *testing.T) { } } // Add some (identical) layers on top - parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) - child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) + parent := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child := newDiffLayer(parent, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) + child = newDiffLayer(child, common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) // And flatten merged := (child.flatten()).(*diffLayer) @@ -151,13 +151,13 @@ func TestMergeDelete(t *testing.T) { } } // Add some flipAccs-flopping layers on top - parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage) - child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) - child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) - child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage) - child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage) + parent := newDiffLayer(emptyLayer(), common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child := parent.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) + child = child.Update(common.Hash{}, flopDrops(), flopAccs(), storage, nil) + child = child.Update(common.Hash{}, flipDrops(), flipAccs(), storage, nil) if data, _ := child.Account(h1); data == nil { t.Errorf("last diff layer: expected %x account to be non-nil", h1) @@ -209,7 +209,7 @@ func TestInsertAndMerge(t *testing.T) { accounts = make(map[common.Hash][]byte) storage = make(map[common.Hash]map[common.Hash][]byte) ) - parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage) + parent = newDiffLayer(emptyLayer(), common.Hash{}, destructs, accounts, storage, nil) } { var ( @@ -220,7 +220,7 @@ func TestInsertAndMerge(t *testing.T) { accounts[acc] = randomAccount() storage[acc] = make(map[common.Hash][]byte) storage[acc][slot] = []byte{0x01} - child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + child = newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } // And flatten merged := (child.flatten()).(*diffLayer) @@ -256,7 +256,7 @@ func BenchmarkSearch(b *testing.B) { for i := 0; i < 10000; i++ { accounts[randomHash()] = randomAccount() } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } var layer snapshot layer = emptyLayer() @@ -298,7 +298,7 @@ func BenchmarkSearchSlot(b *testing.B) { accStorage[randomHash()] = value storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } var layer snapshot layer = emptyLayer() @@ -336,7 +336,7 @@ func BenchmarkFlatten(b *testing.B) { } storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -386,7 +386,7 @@ func BenchmarkJournal(b *testing.B) { } storage[accountKey] = accStorage } - return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage) + return newDiffLayer(parent, common.Hash{}, destructs, accounts, storage, nil) } layer := snapshot(emptyLayer()) for i := 1; i < 128; i++ { diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index 7cbf6e293d..c1de41782c 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -49,6 +49,16 @@ func (dl *diskLayer) Root() common.Hash { return dl.root } +func (dl *diskLayer) WaitAndGetVerifyRes() bool { + return true +} + +func (dl *diskLayer) MarkValid() {} + +func (dl *diskLayer) Verified() bool { + return true +} + // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil @@ -161,6 +171,6 @@ func (dl *diskLayer) Storage(accountHash, storageHash common.Hash) ([]byte, erro // Update creates a new layer on top of the existing snapshot diff tree with // the specified data items. Note, the maps are retained by the method to avoid // copying everything. -func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer { - return newDiffLayer(dl, blockHash, destructs, accounts, storage) +func (dl *diskLayer) Update(blockHash common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer { + return newDiffLayer(dl, blockHash, destructs, accounts, storage, verified) } diff --git a/core/state/snapshot/disklayer_test.go b/core/state/snapshot/disklayer_test.go index 362edba90d..689ed38773 100644 --- a/core/state/snapshot/disklayer_test.go +++ b/core/state/snapshot/disklayer_test.go @@ -134,7 +134,7 @@ func TestDiskMerge(t *testing.T) { conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, conDelNoCache: {conDelNoCacheSlot: nil}, conDelCache: {conDelCacheSlot: nil}, - }); err != nil { + }, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -357,7 +357,7 @@ func TestDiskPartialMerge(t *testing.T) { conModCache: {conModCacheSlot: reverse(conModCacheSlot[:])}, conDelNoCache: {conDelNoCacheSlot: nil}, conDelCache: {conDelCacheSlot: nil}, - }); err != nil { + }, nil); err != nil { t.Fatalf("test %d: failed to update snapshot tree: %v", i, err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -468,7 +468,7 @@ func TestDiskGeneratorPersistence(t *testing.T) { // Modify or delete some accounts, flatten everything onto disk if err := snaps.update(diffRoot, baseRoot, nil, map[common.Hash][]byte{ accTwo: accTwo[:], - }, nil); err != nil { + }, nil, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } if err := snaps.Cap(diffRoot, 0); err != nil { @@ -488,7 +488,7 @@ func TestDiskGeneratorPersistence(t *testing.T) { accThree: accThree.Bytes(), }, map[common.Hash]map[common.Hash][]byte{ accThree: {accThreeSlot: accThreeSlot.Bytes()}, - }); err != nil { + }, nil); err != nil { t.Fatalf("failed to update snapshot tree: %v", err) } diskLayer := snaps.layers[snaps.diskRoot()].(*diskLayer) diff --git a/core/state/snapshot/iterator_test.go b/core/state/snapshot/iterator_test.go index 2a27b01577..3ffaff32ed 100644 --- a/core/state/snapshot/iterator_test.go +++ b/core/state/snapshot/iterator_test.go @@ -53,7 +53,7 @@ func TestAccountIteratorBasics(t *testing.T) { } } // Add some (identical) layers on top - diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage)) + diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, copyDestructs(destructs), copyAccounts(accounts), copyStorage(storage), nil) it := diffLayer.AccountIterator(common.Hash{}) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator @@ -91,7 +91,7 @@ func TestStorageIteratorBasics(t *testing.T) { nilStorage[h] = nilstorage } // Add some (identical) layers on top - diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage)) + diffLayer := newDiffLayer(emptyLayer(), common.Hash{}, nil, copyAccounts(accounts), copyStorage(storage), nil) for account := range accounts { it, _ := diffLayer.StorageIterator(account, common.Hash{}) verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator @@ -222,13 +222,13 @@ func TestAccountIteratorTraversal(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Verify the single and multi-layer iterators head := snaps.Snapshot(common.HexToHash("0x04")) @@ -269,13 +269,13 @@ func TestStorageIteratorTraversal(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil), nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil) // Verify the single and multi-layer iterators head := snaps.Snapshot(common.HexToHash("0x04")) @@ -353,14 +353,14 @@ func TestAccountIteratorTraversalValues(t *testing.T) { } } // Assemble a stack of snapshots from the account layers - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil) - snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil) - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil) - snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil) - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil) - snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil) - snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil) - snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, a, nil, nil) + snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, b, nil, nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, c, nil, nil) + snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, d, nil, nil) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, e, nil, nil) + snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, f, nil, nil) + snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, g, nil, nil) + snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, h, nil, nil) it, _ := snaps.AccountIterator(common.HexToHash("0x09"), common.Hash{}) head := snaps.Snapshot(common.HexToHash("0x09")) @@ -452,14 +452,14 @@ func TestStorageIteratorTraversalValues(t *testing.T) { } } // Assemble a stack of snapshots from the account layers - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a)) - snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b)) - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c)) - snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d)) - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e)) - snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e)) - snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g)) - snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h)) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, randomAccountSet("0xaa"), wrapStorage(a), nil) + snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, randomAccountSet("0xaa"), wrapStorage(b), nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, randomAccountSet("0xaa"), wrapStorage(c), nil) + snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, randomAccountSet("0xaa"), wrapStorage(d), nil) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil) + snaps.update(common.HexToHash("0x07"), common.HexToHash("0x06"), nil, randomAccountSet("0xaa"), wrapStorage(e), nil) + snaps.update(common.HexToHash("0x08"), common.HexToHash("0x07"), nil, randomAccountSet("0xaa"), wrapStorage(g), nil) + snaps.update(common.HexToHash("0x09"), common.HexToHash("0x08"), nil, randomAccountSet("0xaa"), wrapStorage(h), nil) it, _ := snaps.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{}) head := snaps.Snapshot(common.HexToHash("0x09")) @@ -522,7 +522,7 @@ func TestAccountIteratorLargeTraversal(t *testing.T) { }, } for i := 1; i < 128; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil) } // Iterate the entire stack and ensure everything is hit only once head := snaps.Snapshot(common.HexToHash("0x80")) @@ -567,13 +567,13 @@ func TestAccountIteratorFlattening(t *testing.T) { } // Create a stack of diffs on top snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Create an iterator and flatten the data from underneath it it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{}) @@ -598,13 +598,13 @@ func TestAccountIteratorSeek(t *testing.T) { }, } snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil) + randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xbb", "0xdd", "0xf0"), nil) + randomAccountSet("0xbb", "0xdd", "0xf0"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xcc", "0xf0", "0xff"), nil) + randomAccountSet("0xcc", "0xf0", "0xff"), nil, nil) // Account set is now // 02: aa, ee, f0, ff @@ -662,13 +662,13 @@ func TestStorageIteratorSeek(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x05", "0x06"}}, nil), nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x05", "0x08"}}, nil), nil) // Account set is now // 02: 01, 03, 05 @@ -725,17 +725,17 @@ func TestAccountIteratorDeletions(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), - nil, randomAccountSet("0x11", "0x22", "0x33"), nil) + nil, randomAccountSet("0x11", "0x22", "0x33"), nil, nil) deleted := common.HexToHash("0x22") destructed := map[common.Hash]struct{}{ deleted: {}, } snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), - destructed, randomAccountSet("0x11", "0x33"), nil) + destructed, randomAccountSet("0x11", "0x33"), nil, nil) snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), - nil, randomAccountSet("0x33", "0x44", "0x55"), nil) + nil, randomAccountSet("0x33", "0x44", "0x55"), nil, nil) // The output should be 11,33,44,55 it, _ := snaps.AccountIterator(common.HexToHash("0x04"), common.Hash{}) @@ -771,10 +771,10 @@ func TestStorageIteratorDeletions(t *testing.T) { } // Stack three diff layers on top with various overlaps snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x03", "0x05"}}, nil), nil) snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}})) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x02", "0x04", "0x06"}}, [][]string{{"0x01", "0x03"}}), nil) // The output should be 02,04,05,06 it, _ := snaps.StorageIterator(common.HexToHash("0x03"), common.HexToHash("0xaa"), common.Hash{}) @@ -790,7 +790,7 @@ func TestStorageIteratorDeletions(t *testing.T) { destructed := map[common.Hash]struct{}{ common.HexToHash("0xaa"): {}, } - snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil) + snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), destructed, nil, nil, nil) it, _ = snaps.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{}) verifyIterator(t, 0, it, verifyStorage) @@ -798,7 +798,7 @@ func TestStorageIteratorDeletions(t *testing.T) { // Re-insert the slots of the same account snaps.update(common.HexToHash("0x05"), common.HexToHash("0x04"), nil, - randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil)) + randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x07", "0x08", "0x09"}}, nil), nil) // The output should be 07,08,09 it, _ = snaps.StorageIterator(common.HexToHash("0x05"), common.HexToHash("0xaa"), common.Hash{}) @@ -806,7 +806,7 @@ func TestStorageIteratorDeletions(t *testing.T) { it.Release() // Destruct the whole storage but re-create the account in the same layer - snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil)) + snaps.update(common.HexToHash("0x06"), common.HexToHash("0x05"), destructed, randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x11", "0x12"}}, nil), nil) it, _ = snaps.StorageIterator(common.HexToHash("0x06"), common.HexToHash("0xaa"), common.Hash{}) verifyIterator(t, 2, it, verifyStorage) // The output should be 11,12 it.Release() @@ -848,7 +848,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) { }, } for i := 1; i <= 100; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(200), nil, nil) } // We call this once before the benchmark, so the creation of // sorted accountlists are not included in the results. @@ -943,9 +943,9 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) { base.root: base, }, } - snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil) + snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, makeAccounts(2000), nil, nil) for i := 2; i <= 100; i++ { - snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil) + snaps.update(common.HexToHash(fmt.Sprintf("0x%02x", i+1)), common.HexToHash(fmt.Sprintf("0x%02x", i)), nil, makeAccounts(20), nil, nil) } // We call this once before the benchmark, so the creation of // sorted accountlists are not included in the results. diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 6836a57409..d11ca6a05f 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -266,7 +266,7 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { } storageData[entry.Hash] = slots } - return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData), r) + return loadDiffLayer(newDiffLayer(parent, root, destructSet, accountData, storageData, nil), r) } // Journal terminates any in-progress snapshot generation, also implicitly pushing diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 61d4fbbf21..62a372e5b6 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -101,6 +101,15 @@ type Snapshot interface { // Root returns the root hash for which this snapshot was made. Root() common.Hash + // WaitAndGetVerifyRes will wait until the snapshot been verified and return verification result + WaitAndGetVerifyRes() bool + + // Verified returns whether the snapshot is verified + Verified() bool + + // Store the verification result + MarkValid() + // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) @@ -130,7 +139,7 @@ type snapshot interface { // the specified data items. // // Note, the maps are retained by the method to avoid copying everything. - Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer + Update(blockRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) *diffLayer // Journal commits an entire diff hierarchy to disk into a single journal entry. // This is meant to be used during shutdown to persist the snapshot without @@ -331,14 +340,14 @@ func (t *Tree) Snapshots(root common.Hash, limits int, nodisk bool) []Snapshot { return ret } -func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte) error { +func (t *Tree) Update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Address]struct{}, accounts map[common.Address][]byte, storage map[common.Address]map[string][]byte, verified chan struct{}) error { hashDestructs, hashAccounts, hashStorage := transformSnapData(destructs, accounts, storage) - return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage) + return t.update(blockRoot, parentRoot, hashDestructs, hashAccounts, hashStorage, verified) } // Update adds a new snapshot into the tree, if that can be linked to an existing // old parent. It is disallowed to insert a disk layer (the origin of all). -func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error { +func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs map[common.Hash]struct{}, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte, verified chan struct{}) error { // Reject noop updates to avoid self-loops in the snapshot tree. This is a // special case that can only happen for Clique networks where empty blocks // don't modify the state (0 block subsidy). @@ -353,7 +362,7 @@ func (t *Tree) update(blockRoot common.Hash, parentRoot common.Hash, destructs m if parent == nil { return fmt.Errorf("parent [%#x] snapshot missing", parentRoot) } - snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage) + snap := parent.(snapshot).Update(blockRoot, destructs, accounts, storage, verified) // Save the new snapshot for later t.lock.Lock() diff --git a/core/state/snapshot/snapshot_test.go b/core/state/snapshot/snapshot_test.go index 92dc215611..a10ea3cf90 100644 --- a/core/state/snapshot/snapshot_test.go +++ b/core/state/snapshot/snapshot_test.go @@ -106,7 +106,7 @@ func TestDiskLayerExternalInvalidationFullFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 2 { @@ -150,10 +150,10 @@ func TestDiskLayerExternalInvalidationPartialFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 3 { @@ -198,13 +198,13 @@ func TestDiffLayerExternalInvalidationPartialFlatten(t *testing.T) { accounts := map[common.Hash][]byte{ common.HexToHash("0xa1"): randomAccount(), } - if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x02"), common.HexToHash("0x01"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x03"), common.HexToHash("0x02"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } - if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil); err != nil { + if err := snaps.update(common.HexToHash("0x04"), common.HexToHash("0x03"), nil, accounts, nil, nil); err != nil { t.Fatalf("failed to create a diff layer: %v", err) } if n := len(snaps.layers); n != 4 { @@ -258,12 +258,12 @@ func TestPostCapBasicDataAccess(t *testing.T) { }, } // The lowest difflayer - snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil) - snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil) - snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil) + snaps.update(common.HexToHash("0xa1"), common.HexToHash("0x01"), nil, setAccount("0xa1"), nil, nil) + snaps.update(common.HexToHash("0xa2"), common.HexToHash("0xa1"), nil, setAccount("0xa2"), nil, nil) + snaps.update(common.HexToHash("0xb2"), common.HexToHash("0xa1"), nil, setAccount("0xb2"), nil, nil) - snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil) - snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil) + snaps.update(common.HexToHash("0xa3"), common.HexToHash("0xa2"), nil, setAccount("0xa3"), nil, nil) + snaps.update(common.HexToHash("0xb3"), common.HexToHash("0xb2"), nil, setAccount("0xb3"), nil, nil) // checkExist verifies if an account exiss in a snapshot checkExist := func(layer *diffLayer, key string) error { @@ -358,7 +358,7 @@ func TestSnaphots(t *testing.T) { ) for i := 0; i < 129; i++ { head = makeRoot(uint64(i + 2)) - snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil) + snaps.update(head, last, nil, setAccount(fmt.Sprintf("%d", i+2)), nil, nil) last = head snaps.Cap(head, 128) // 130 layers (128 diffs + 1 accumulator + 1 disk) } diff --git a/core/state/state_test.go b/core/state/state_test.go index 9f3c3b8c73..4cc5c33a85 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -54,7 +54,9 @@ func TestDump(t *testing.T) { // write some of them to the trie s.state.updateStateObject(obj1) s.state.updateStateObject(obj2) - s.state.Commit(false) + s.state.Finalise(false) + s.state.AccountsIntermediateRoot() + s.state.Commit(nil) // check that DumpToCollector contains the state objects that are in trie got := string(s.state.Dump(nil)) @@ -98,7 +100,9 @@ func TestNull(t *testing.T) { var value common.Hash s.state.SetState(address, common.Hash{}, value) - s.state.Commit(false) + s.state.Finalise(false) + s.state.AccountsIntermediateRoot() + s.state.Commit(nil) if value := s.state.GetState(address, common.Hash{}); value != (common.Hash{}) { t.Errorf("expected empty current value, got %x", value) @@ -170,7 +174,9 @@ func TestSnapshot2(t *testing.T) { so0.deleted = false state.SetStateObject(so0) - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) state, _ = New(root, state.db, state.snaps) // and one with deleted == true diff --git a/core/state/statedb.go b/core/state/statedb.go index d5f6397873..d905b81662 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -74,14 +74,20 @@ func (n *proofList) Delete(key []byte) error { // * Accounts type StateDB struct { db Database + prefetcherLock sync.Mutex prefetcher *triePrefetcher originalRoot common.Hash // The pre-state root, before any changes were made + expectedRoot common.Hash // The state root in the block header + stateRoot common.Hash // The calculation result of IntermediateRoot + trie Trie hasher crypto.KeccakState diffLayer *types.DiffLayer diffTries map[common.Address]Trie diffCode map[common.Hash][]byte lightProcessed bool + fullProcessed bool + pipeCommit bool snapMux sync.Mutex snaps *snapshot.Tree @@ -159,11 +165,6 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, journal: newJournal(), hasher: crypto.NewKeccakState(), } - tr, err := db.OpenTrie(root) - if err != nil { - return nil, err - } - sdb.trie = tr if sdb.snaps != nil { if sdb.snap = sdb.snaps.Snapshot(root); sdb.snap != nil { sdb.snapDestructs = make(map[common.Address]struct{}) @@ -171,6 +172,14 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, sdb.snapStorage = make(map[common.Address]map[string][]byte) } } + + snapVerified := sdb.snap != nil && sdb.snap.Verified() + tr, err := db.OpenTrie(root) + // return error when 1. failed to open trie and 2. the snap is nil or the snap is not nil and done verification + if err != nil && (sdb.snap == nil || snapVerified) { + return nil, err + } + sdb.trie = tr return sdb, nil } @@ -178,6 +187,8 @@ func newStateDB(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. func (s *StateDB) StartPrefetcher(namespace string) { + s.prefetcherLock.Lock() + defer s.prefetcherLock.Unlock() if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil @@ -190,17 +201,36 @@ func (s *StateDB) StartPrefetcher(namespace string) { // StopPrefetcher terminates a running prefetcher and reports any leftover stats // from the gathered metrics. func (s *StateDB) StopPrefetcher() { + s.prefetcherLock.Lock() + defer s.prefetcherLock.Unlock() if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } } +// Mark that the block is processed by diff layer +func (s *StateDB) SetExpectedStateRoot(root common.Hash) { + s.expectedRoot = root +} + // Mark that the block is processed by diff layer func (s *StateDB) MarkLightProcessed() { s.lightProcessed = true } +// Enable the pipeline commit function of statedb +func (s *StateDB) EnablePipeCommit() { + if s.snap != nil { + s.pipeCommit = true + } +} + +// Mark that the block is full processed +func (s *StateDB) MarkFullProcessed() { + s.fullProcessed = true +} + func (s *StateDB) IsLightProcessed() bool { return s.lightProcessed } @@ -216,8 +246,20 @@ func (s *StateDB) Error() error { return s.dbErr } -func (s *StateDB) Trie() Trie { - return s.trie +// Not thread safe +func (s *StateDB) Trie() (Trie, error) { + if s.trie == nil { + err := s.WaitPipeVerification() + if err != nil { + return nil, err + } + tr, err := s.db.OpenTrie(s.originalRoot) + if err != nil { + return nil, err + } + s.trie = tr + } + return s.trie, nil } func (s *StateDB) SetDiff(diffLayer *types.DiffLayer, diffTries map[common.Address]Trie, diffCode map[common.Hash][]byte) { @@ -363,6 +405,9 @@ func (s *StateDB) GetProof(addr common.Address) ([][]byte, error) { // GetProofByHash returns the Merkle proof for a given account. func (s *StateDB) GetProofByHash(addrHash common.Hash) ([][]byte, error) { var proof proofList + if _, err := s.Trie(); err != nil { + return nil, err + } err := s.trie.Prove(addrHash[:], 0, &proof) return proof, err } @@ -887,6 +932,17 @@ func (s *StateDB) GetRefund() uint64 { return s.refund } +// GetRefund returns the current value of the refund counter. +func (s *StateDB) WaitPipeVerification() error { + // We need wait for the parent trie to commit + if s.snap != nil { + if valid := s.snap.WaitAndGetVerifyRes(); !valid { + return fmt.Errorf("verification on parent snap failed") + } + } + return nil +} + // Finalise finalises the state by removing the s destructed objects and clears // the journal as well as the refunds. Finalise, however, will not push any updates // into the tries just yet. Only IntermediateRoot or Commit will do that. @@ -946,22 +1002,11 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) + s.AccountsIntermediateRoot() + return s.StateIntermediateRoot() +} - // If there was a trie prefetcher operating, it gets aborted and irrevocably - // modified after we start retrieving tries. Remove it from the statedb after - // this round of use. - // - // This is weird pre-byzantium since the first tx runs with a prefetcher and - // the remainder without, but pre-byzantium even the initial prefetcher is - // useless, so no sleep lost. - prefetcher := s.prefetcher - if s.prefetcher != nil { - defer func() { - s.prefetcher.close() - s.prefetcher = nil - }() - } - +func (s *StateDB) AccountsIntermediateRoot() { tasks := make(chan func()) finishCh := make(chan struct{}) defer close(finishCh) @@ -978,6 +1023,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } }() } + // Although naively it makes sense to retrieve the account trie and then do // the contract storage and account updates sequentially, that short circuits // the account prefetcher. Instead, let's process all the storage updates @@ -1009,6 +1055,27 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { } } wg.Wait() +} + +func (s *StateDB) StateIntermediateRoot() common.Hash { + // If there was a trie prefetcher operating, it gets aborted and irrevocably + // modified after we start retrieving tries. Remove it from the statedb after + // this round of use. + // + // This is weird pre-byzantium since the first tx runs with a prefetcher and + // the remainder without, but pre-byzantium even the initial prefetcher is + // useless, so no sleep lost. + prefetcher := s.prefetcher + defer func() { + s.prefetcherLock.Lock() + if s.prefetcher != nil { + s.prefetcher.close() + s.prefetcher = nil + } + // try not use defer inside defer + s.prefetcherLock.Unlock() + }() + // Now we're about to start to write changes to the trie. The trie is so far // _untouched_. We can check with the prefetcher, if it can give us a trie // which has the same root, but also has some content loaded into it. @@ -1020,7 +1087,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { if s.trie == nil { tr, err := s.db.OpenTrie(s.originalRoot) if err != nil { - panic("Failed to open trie tree") + panic(fmt.Sprintf("Failed to open trie tree %s", s.originalRoot)) } s.trie = tr } @@ -1065,9 +1132,12 @@ func (s *StateDB) clearJournalAndRefund() { s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires } -func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) LightCommit() (common.Hash, *types.DiffLayer, error) { codeWriter := s.db.TrieDB().DiskDB().NewBatch() + // light process already verified it, expectedRoot is trustworthy. + root := s.expectedRoot + commitFuncs := []func() error{ func() error { for codeHash, code := range s.diffCode { @@ -1155,7 +1225,8 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, } // Only update if there's a state transition (skip empty Clique blocks) if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { + // for light commit, always do sync commit + if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, nil); err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) } // Keep n diff layers in the memory @@ -1189,23 +1260,42 @@ func (s *StateDB) LightCommit(root common.Hash) (common.Hash, *types.DiffLayer, } // Commit writes the state to the underlying in-memory trie database. -func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer, error) { +func (s *StateDB) Commit(failPostCommitFunc func(), postCommitFuncs ...func() error) (common.Hash, *types.DiffLayer, error) { if s.dbErr != nil { return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr) } // Finalize any pending changes and merge everything into the tries - root := s.IntermediateRoot(deleteEmptyObjects) if s.lightProcessed { - return s.LightCommit(root) + root, diff, err := s.LightCommit() + if err != nil { + return root, diff, err + } + for _, postFunc := range postCommitFuncs { + err = postFunc() + if err != nil { + return root, diff, err + } + } + return root, diff, nil } var diffLayer *types.DiffLayer + var verified chan struct{} + var snapUpdated chan struct{} if s.snap != nil { diffLayer = &types.DiffLayer{} } - commitFuncs := []func() error{ - func() error { - // Commit objects to the trie, measuring the elapsed time - tasks := make(chan func(batch ethdb.KeyValueWriter)) + if s.pipeCommit { + // async commit the MPT + verified = make(chan struct{}) + snapUpdated = make(chan struct{}) + } + + commmitTrie := func() error { + commitErr := func() error { + if s.stateRoot = s.StateIntermediateRoot(); s.fullProcessed && s.expectedRoot != s.stateRoot { + return fmt.Errorf("invalid merkle root (remote: %x local: %x)", s.expectedRoot, s.stateRoot) + } + tasks := make(chan func()) taskResults := make(chan error, len(s.stateObjectsDirty)) tasksNum := 0 finishCh := make(chan struct{}) @@ -1216,17 +1306,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer wg.Add(1) go func() { defer wg.Done() - codeWriter := s.db.TrieDB().DiskDB().NewBatch() for { select { case task := <-tasks: - task(codeWriter) + task() case <-finishCh: - if codeWriter.ValueSize() > 0 { - if err := codeWriter.Write(); err != nil { - log.Crit("Failed to commit dirty codes", "error", err) - } - } return } } @@ -1249,11 +1333,7 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object - tasks <- func(codeWriter ethdb.KeyValueWriter) { - if obj.code != nil && obj.dirtyCode { - rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) - obj.dirtyCode = false - } + tasks <- func() { // Write any storage changes in the state object to its storage trie if _, err := obj.CommitTrie(s.db); err != nil { taskResults <- err @@ -1273,14 +1353,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer } close(finishCh) - if len(s.stateObjectsDirty) > 0 { - s.stateObjectsDirty = make(map[common.Address]struct{}, len(s.stateObjectsDirty)/2) - } - // Write the account trie changes, measuing the amount of wasted time - var start time.Time - if metrics.EnabledExpensive { - start = time.Now() - } // The onleaf func is called _serially_, so we can reuse the same account // for unmarshalling every time. var account types.StateAccount @@ -1296,14 +1368,60 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer if err != nil { return err } - if metrics.EnabledExpensive { - s.AccountCommits += time.Since(start) - } if root != emptyRoot { s.db.CacheAccount(root, s.trie) } + for _, postFunc := range postCommitFuncs { + err = postFunc() + if err != nil { + return err + } + } wg.Wait() return nil + }() + + if s.pipeCommit { + if commitErr == nil { + <-snapUpdated + s.snaps.Snapshot(s.stateRoot).MarkValid() + } else { + // The blockchain will do the further rewind if write block not finish yet + if failPostCommitFunc != nil { + <-snapUpdated + failPostCommitFunc() + } + log.Error("state verification failed", "err", commitErr) + } + close(verified) + } + return commitErr + } + + commitFuncs := []func() error{ + func() error { + codeWriter := s.db.TrieDB().DiskDB().NewBatch() + for addr := range s.stateObjectsDirty { + if obj := s.stateObjects[addr]; !obj.deleted { + if obj.code != nil && obj.dirtyCode { + rawdb.WriteCode(codeWriter, common.BytesToHash(obj.CodeHash()), obj.code) + obj.dirtyCode = false + if codeWriter.ValueSize() > ethdb.IdealBatchSize { + if err := codeWriter.Write(); err != nil { + return err + } + codeWriter.Reset() + } + } + } + } + if codeWriter.ValueSize() > 0 { + if err := codeWriter.Write(); err != nil { + log.Crit("Failed to commit dirty codes", "error", err) + return err + } + } + return nil }, func() error { // If snapshotting is enabled, update the snapshot tree with this new version @@ -1311,18 +1429,23 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer if metrics.EnabledExpensive { defer func(start time.Time) { s.SnapshotCommits += time.Since(start) }(time.Now()) } + if s.pipeCommit { + defer close(snapUpdated) + } // Only update if there's a state transition (skip empty Clique blocks) - if parent := s.snap.Root(); parent != root { - if err := s.snaps.Update(root, parent, s.snapDestructs, s.snapAccounts, s.snapStorage); err != nil { - log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) + if parent := s.snap.Root(); parent != s.expectedRoot { + if err := s.snaps.Update(s.expectedRoot, parent, s.snapDestructs, s.snapAccounts, s.snapStorage, verified); err != nil { + log.Warn("Failed to update snapshot tree", "from", parent, "to", s.expectedRoot, "err", err) } // Keep n diff layers in the memory // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state // - head-(n-1) layer(bottom-most diff layer) is paired with HEAD-(n-1)state - if err := s.snaps.Cap(root, s.snaps.CapLimit()); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", s.snaps.CapLimit(), "err", err) - } + go func() { + if err := s.snaps.Cap(s.expectedRoot, s.snaps.CapLimit()); err != nil { + log.Warn("Failed to cap snapshot tree", "root", s.expectedRoot, "layers", s.snaps.CapLimit(), "err", err) + } + }() } } return nil @@ -1334,6 +1457,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer return nil }, } + if s.pipeCommit { + go commmitTrie() + } else { + commitFuncs = append(commitFuncs, commmitTrie) + } commitRes := make(chan error, len(commitFuncs)) for _, f := range commitFuncs { tmpFunc := f @@ -1347,7 +1475,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, *types.DiffLayer return common.Hash{}, nil, r } } - s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil + root := s.stateRoot + if s.pipeCommit { + root = s.expectedRoot + } + return root, diffLayer, nil } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 24362e7b39..4b3a91cde6 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -102,7 +102,9 @@ func TestIntermediateLeaks(t *testing.T) { } // Commit and cross check the databases. - transRoot, _, err := transState.Commit(false) + transState.Finalise(false) + transState.AccountsIntermediateRoot() + transRoot, _, err := transState.Commit(nil) if err != nil { t.Fatalf("failed to commit transition state: %v", err) } @@ -110,7 +112,9 @@ func TestIntermediateLeaks(t *testing.T) { t.Errorf("can not commit trie %v to persistent database", transRoot.Hex()) } - finalRoot, _, err := finalState.Commit(false) + finalState.Finalise(false) + finalState.AccountsIntermediateRoot() + finalRoot, _, err := finalState.Commit(nil) if err != nil { t.Fatalf("failed to commit final state: %v", err) } @@ -473,7 +477,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error { func TestTouchDelete(t *testing.T) { s := newStateTest() s.state.GetOrNewStateObject(common.Address{}) - root, _, _ := s.state.Commit(false) + root, _, _ := s.state.Commit(nil) s.state, _ = New(root, s.state.db, s.state.snaps) snapshot := s.state.Snapshot() @@ -546,7 +550,9 @@ func TestCopyCommitCopy(t *testing.T) { t.Fatalf("first copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{}) } - copyOne.Commit(false) + copyOne.Finalise(false) + copyOne.AccountsIntermediateRoot() + copyOne.Commit(nil) if balance := copyOne.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 { t.Fatalf("first copy post-commit balance mismatch: have %v, want %v", balance, 42) } @@ -631,7 +637,10 @@ func TestCopyCopyCommitCopy(t *testing.T) { if val := copyTwo.GetCommittedState(addr, skey); val != (common.Hash{}) { t.Fatalf("second copy pre-commit committed storage slot mismatch: have %x, want %x", val, common.Hash{}) } - copyTwo.Commit(false) + + copyTwo.Finalise(false) + copyTwo.AccountsIntermediateRoot() + copyTwo.Commit(nil) if balance := copyTwo.GetBalance(addr); balance.Cmp(big.NewInt(42)) != 0 { t.Fatalf("second copy post-commit balance mismatch: have %v, want %v", balance, 42) } @@ -675,7 +684,9 @@ func TestDeleteCreateRevert(t *testing.T) { addr := common.BytesToAddress([]byte("so")) state.SetBalance(addr, big.NewInt(1)) - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) state, _ = New(root, state.db, state.snaps) // Simulate self-destructing in one transaction, then create-reverting in another @@ -686,8 +697,10 @@ func TestDeleteCreateRevert(t *testing.T) { state.SetBalance(addr, big.NewInt(2)) state.RevertToSnapshot(id) + state.Finalise(true) + state.AccountsIntermediateRoot() // Commit the entire state and make sure we don't crash and have the correct state - root, _, _ = state.Commit(true) + root, _, _ = state.Commit(nil) state, _ = New(root, state.db, state.snaps) if state.getStateObject(addr) != nil { @@ -712,7 +725,9 @@ func TestMissingTrieNodes(t *testing.T) { a2 := common.BytesToAddress([]byte("another")) state.SetBalance(a2, big.NewInt(100)) state.SetCode(a2, []byte{1, 2, 4}) - root, _, _ = state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ = state.Commit(nil) t.Logf("root: %x", root) // force-flush state.Database().TrieDB().Cap(0) @@ -736,7 +751,9 @@ func TestMissingTrieNodes(t *testing.T) { } // Modify the state state.SetBalance(addr, big.NewInt(2)) - root, _, err := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, err := state.Commit(nil) if err == nil { t.Fatalf("expected error, got root :%x", root) } diff --git a/core/state/sync_test.go b/core/state/sync_test.go index a692834eee..0bdf8d468c 100644 --- a/core/state/sync_test.go +++ b/core/state/sync_test.go @@ -69,7 +69,9 @@ func makeTestState() (Database, common.Hash, []*testAccount) { state.updateStateObject(obj) accounts = append(accounts, acc) } - root, _, _ := state.Commit(false) + state.Finalise(false) + state.AccountsIntermediateRoot() + root, _, _ := state.Commit(nil) // Return the generated state return db, root, accounts diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 58cd3c9187..44f90940fd 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -20,7 +20,6 @@ import ( "sync" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) @@ -106,7 +105,7 @@ func (p *triePrefetcher) close() { for _, fetcher := range p.fetchers { p.abortChan <- fetcher // safe to do multiple times <-fetcher.term - if metrics.Enabled { + if metrics.EnabledExpensive { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) @@ -257,9 +256,7 @@ func newSubfetcher(db Database, root common.Hash, accountHash common.Hash) *subf seen: make(map[string]struct{}), accountHash: accountHash, } - gopool.Submit(func() { - sf.loop() - }) + go sf.loop() return sf } @@ -322,8 +319,7 @@ func (sf *subfetcher) loop() { trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root) } if err != nil { - log.Warn("Trie prefetcher failed opening trie", "root", sf.root, "err", err) - return + log.Debug("Trie prefetcher failed opening trie", "root", sf.root, "err", err) } sf.trie = trie @@ -332,6 +328,18 @@ func (sf *subfetcher) loop() { select { case <-sf.wake: // Subfetcher was woken up, retrieve any tasks to avoid spinning the lock + if sf.trie == nil { + if sf.accountHash == emptyAddr { + sf.trie, err = sf.db.OpenTrie(sf.root) + } else { + // address is useless + sf.trie, err = sf.db.OpenStorageTrie(sf.accountHash, sf.root) + } + if err != nil { + continue + } + } + sf.lock.Lock() tasks := sf.tasks sf.tasks = nil diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 989303a035..85f9a40c45 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,7 +17,6 @@ package core import ( - "runtime" "sync/atomic" "github.com/ethereum/go-ethereum/consensus" @@ -27,6 +26,8 @@ import ( "github.com/ethereum/go-ethereum/params" ) +const prefetchThread = 2 + // statePrefetcher is a basic Prefetcher, which blindly executes a block on top // of an arbitrary state with the goal of prefetching potentially useful state // data from disk before the main block processor start executing. @@ -54,25 +55,23 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c signer = types.MakeSigner(p.config, header.Number) ) transactions := block.Transactions() - threads := runtime.NumCPU() - batch := len(transactions) / (threads + 1) - if batch == 0 { - return + sortTransactions := make([][]*types.Transaction, prefetchThread) + for i := 0; i < prefetchThread; i++ { + sortTransactions[i] = make([]*types.Transaction, 0, len(transactions)/prefetchThread) + } + for idx := range transactions { + threadIdx := idx % prefetchThread + sortTransactions[threadIdx] = append(sortTransactions[threadIdx], transactions[idx]) } // No need to execute the first batch, since the main processor will do it. - for i := 1; i <= threads; i++ { - start := i * batch - end := (i + 1) * batch - if i == threads { - end = len(transactions) - } - go func(start, end int) { + for i := 0; i < prefetchThread; i++ { + go func(idx int) { newStatedb := statedb.Copy() gaspool := new(GasPool).AddGas(block.GasLimit()) blockContext := NewEVMBlockContext(header, p.bc, nil) evm := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, cfg) // Iterate over and process the individual transactions - for i, tx := range transactions[start:end] { + for i, tx := range sortTransactions[idx] { // If block precaching was interrupted, abort if interrupt != nil && atomic.LoadUint32(interrupt) == 1 { return @@ -83,22 +82,18 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c return // Also invalid block, bail out } newStatedb.Prepare(tx.Hash(), i) - if err := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm); err != nil { - return // Ugh, something went horribly wrong, bail out - } + precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) } - }(start, end) + }(i) } - } // precacheTransaction attempts to apply a transaction to the given state database // and uses the input parameters for its environment. The goal is not to execute // the transaction successfully, rather to warm up touched data slots. -func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) error { +func precacheTransaction(msg types.Message, config *params.ChainConfig, gaspool *GasPool, statedb *state.StateDB, header *types.Header, evm *vm.EVM) { // Update the evm with the new transaction context. evm.Reset(NewEVMTxContext(msg), statedb) // Add addresses to access list if applicable - _, err := ApplyMessage(evm, msg, gaspool) - return err + ApplyMessage(evm, msg, gaspool) } diff --git a/core/state_processor.go b/core/state_processor.go index 60defbde9f..30247a9f4f 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -123,6 +123,10 @@ func (p *LightStateProcessor) Process(block *types.Block, statedb *state.StateDB statedb.StopPrefetcher() parent := p.bc.GetHeader(block.ParentHash(), block.NumberU64()-1) statedb, err = state.New(parent.Root, p.bc.stateCache, p.bc.snaps) + statedb.SetExpectedStateRoot(block.Root()) + if p.bc.pipeCommit { + statedb.EnablePipeCommit() + } if err != nil { return statedb, nil, nil, 0, err } @@ -148,9 +152,12 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty for _, c := range diffLayer.Codes { fullDiffCode[c.Hash] = c.Code } - + stateTrie, err := statedb.Trie() + if err != nil { + return nil, nil, 0, err + } for des := range snapDestructs { - statedb.Trie().TryDelete(des[:]) + stateTrie.TryDelete(des[:]) } threads := gopool.Threads(len(snapAccounts)) @@ -191,7 +198,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty // fetch previous state var previousAccount types.StateAccount stateMux.Lock() - enc, err := statedb.Trie().TryGet(diffAccount[:]) + enc, err := stateTrie.TryGet(diffAccount[:]) stateMux.Unlock() if err != nil { errChan <- err @@ -303,7 +310,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty return } stateMux.Lock() - err = statedb.Trie().TryUpdate(diffAccount[:], bz) + err = stateTrie.TryUpdate(diffAccount[:], bz) stateMux.Unlock() if err != nil { errChan <- err @@ -330,7 +337,7 @@ func (p *LightStateProcessor) LightProcess(diffLayer *types.DiffLayer, block *ty } // Do validate in advance so that we can fall back to full process - if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed); err != nil { + if err := p.bc.validator.ValidateState(block, statedb, diffLayer.Receipts, gasUsed, false); err != nil { log.Error("validate state failed during diff sync", "error", err) return nil, nil, 0, err } @@ -379,6 +386,8 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg allLogs []*types.Log gp = new(GasPool).AddGas(block.GasLimit()) ) + signer := types.MakeSigner(p.bc.chainConfig, block.Number()) + statedb.TryPreload(block, signer) var receipts = make([]*types.Receipt, 0) // Mutate the block and state according to any hard-fork specs if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { @@ -397,6 +406,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // initilise bloom processors bloomProcessors := NewAsyncReceiptBloomGenerator(txNum) + statedb.MarkFullProcessed() // usually do have two tx, one for validator set contract, another for system reward contract. systemTxs := make([]*types.Transaction, 0, 2) diff --git a/core/types.go b/core/types.go index 49bd58e086..5ed4817e68 100644 --- a/core/types.go +++ b/core/types.go @@ -31,7 +31,7 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. - ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64, skipHeavyVerify bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/api_test.go b/eth/api_test.go index 39a1d58460..75a0da062c 100644 --- a/eth/api_test.go +++ b/eth/api_test.go @@ -83,7 +83,9 @@ func TestAccountRange(t *testing.T) { m[addr] = true } } - state.Commit(true) + state.Finalise(true) + state.AccountsIntermediateRoot() + state.Commit(nil) root := state.IntermediateRoot(true) trie, err := statedb.OpenTrie(root) @@ -140,7 +142,7 @@ func TestEmptyAccountRange(t *testing.T) { statedb = state.NewDatabase(rawdb.NewMemoryDatabase()) st, _ = state.New(common.Hash{}, statedb, nil) ) - st.Commit(true) + st.Commit(nil) st.IntermediateRoot(true) results := st.IteratorDump(&state.DumpConfig{ SkipCode: true, diff --git a/eth/backend.go b/eth/backend.go index f67bdabd7c..8609afa71c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -209,6 +209,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.DiffSync { bcOps = append(bcOps, core.EnableLightProcessor) } + if config.PipeCommit { + bcOps = append(bcOps, core.EnablePipelineCommit) + } if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index d4f72e0448..1dcd5800f8 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -140,6 +140,7 @@ type Config struct { DirectBroadcast bool DisableSnapProtocol bool //Whether disable snap protocol DiffSync bool // Whether support diff sync + PipeCommit bool RangeLimit bool TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved. diff --git a/eth/state_accessor.go b/eth/state_accessor.go index 0b5406a028..9da4b94bf8 100644 --- a/eth/state_accessor.go +++ b/eth/state_accessor.go @@ -138,7 +138,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64(), err) } // Finalize the state so any modifications are written to the trie - root, _, err := statedb.Commit(eth.blockchain.Config().IsEIP158(current.Number())) + statedb.Finalise(eth.blockchain.Config().IsEIP158(current.Number())) + statedb.AccountsIntermediateRoot() + root, _, err := statedb.Commit(nil) if err != nil { return nil, fmt.Errorf("stateAtBlock commit failed, number %d root %v: %w", current.NumberU64(), current.Root().Hex(), err) diff --git a/eth/tracers/api.go b/eth/tracers/api.go index 996f280067..f6a5a4f9b3 100644 --- a/eth/tracers/api.go +++ b/eth/tracers/api.go @@ -546,7 +546,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config } // calling IntermediateRoot will internally call Finalize on the state // so any modifications are written to the trie - roots = append(roots, statedb.IntermediateRoot(deleteEmptyObjects)) + root := statedb.IntermediateRoot(deleteEmptyObjects) + + roots = append(roots, root) } return roots, nil } diff --git a/ethclient/ethclient_test.go b/ethclient/ethclient_test.go index 828e19db63..300876769b 100644 --- a/ethclient/ethclient_test.go +++ b/ethclient/ethclient_test.go @@ -272,6 +272,7 @@ func newTestBackend(t *testing.T) (*node.Node, []*types.Block) { config := ðconfig.Config{Genesis: genesis} config.Ethash.PowMode = ethash.ModeFake config.SnapshotCache = 256 + config.TriesInMemory = 128 ethservice, err := eth.New(n, config) if err != nil { t.Fatalf("can't create new ethereum service: %v", err) diff --git a/miner/worker.go b/miner/worker.go index 4f772cad0c..609c6874c5 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -751,6 +751,7 @@ func (w *worker) resultLoop() { logs = append(logs, receipt.Logs...) } // Commit block and state to database. + task.state.SetExpectedStateRoot(block.Root()) _, err := w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true) if err != nil { log.Error("Failed writing block to chain", "err", err) @@ -1187,6 +1188,10 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti env := env.copy() s := w.current.state // TODO set uncle to nil here + err := s.WaitPipeVerification() + if err != nil { + return err + } block, receipts, err := w.engine.FinalizeAndAssemble(w.chain, types.CopyHeader(w.current.header), s, w.current.txs, nil, w.current.receipts) if err != nil { return err diff --git a/params/protocol_params.go b/params/protocol_params.go index b296cc93ce..e244c24231 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -129,7 +129,6 @@ const ( // Precompiled contract gas prices - //TODO need further discussion TendermintHeaderValidateGas uint64 = 3000 // Gas for validate tendermiint consensus state IAVLMerkleProofValidateGas uint64 = 3000 // Gas for validate merkle proof diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 586e6275f9..3e66a1c55b 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -235,7 +235,9 @@ func (t *StateTest) RunNoVerify(subtest StateSubtest, vmconfig vm.Config, snapsh } // Commit block - statedb.Commit(config.IsEIP158(block.Number())) + statedb.Finalise(config.IsEIP158(block.Number())) + statedb.AccountsIntermediateRoot() + statedb.Commit(nil) // Add 0-value mining reward. This only makes a difference in the cases // where // - the coinbase suicided, or @@ -263,7 +265,9 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo } } // Commit and re-open to start with a clean state. - root, _, _ := statedb.Commit(false) + statedb.Finalise(false) + statedb.AccountsIntermediateRoot() + root, _, _ := statedb.Commit(nil) var snaps *snapshot.Tree if snapshotter { diff --git a/trie/database.go b/trie/database.go index dc9719165f..0f4351a8a6 100644 --- a/trie/database.go +++ b/trie/database.go @@ -605,14 +605,16 @@ func (db *Database) Cap(limit common.StorageSize) error { // outside code doesn't see an inconsistent state (referenced data removed from // memory cache during commit but not yet in persistent storage). This is ensured // by only uncaching existing data when the database write finalizes. + db.lock.RLock() nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now() - batch := db.diskdb.NewBatch() - // db.dirtiesSize only contains the useful data in the cache, but when reporting // the total memory consumption, the maintenance metadata is also needed to be // counted. size := db.dirtiesSize + common.StorageSize((len(db.dirties)-1)*cachedNodeSize) size += db.childrenSize - common.StorageSize(len(db.dirties[common.Hash{}].children)*(common.HashLength+2)) + db.lock.RUnlock() + + batch := db.diskdb.NewBatch() // If the preimage cache got large enough, push to disk. If it's still small // leave for later to deduplicate writes. @@ -632,27 +634,35 @@ func (db *Database) Cap(limit common.StorageSize) error { } // Keep committing nodes from the flush-list until we're below allowance oldest := db.oldest - for size > limit && oldest != (common.Hash{}) { - // Fetch the oldest referenced node and push into the batch - node := db.dirties[oldest] - rawdb.WriteTrieNode(batch, oldest, node.rlp()) - - // If we exceeded the ideal batch size, commit and reset - if batch.ValueSize() >= ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - log.Error("Failed to write flush list to disk", "err", err) - return err + err := func() error { + db.lock.RLock() + defer db.lock.RUnlock() + for size > limit && oldest != (common.Hash{}) { + // Fetch the oldest referenced node and push into the batch + node := db.dirties[oldest] + rawdb.WriteTrieNode(batch, oldest, node.rlp()) + + // If we exceeded the ideal batch size, commit and reset + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Error("Failed to write flush list to disk", "err", err) + return err + } + batch.Reset() } - batch.Reset() - } - // Iterate to the next flush item, or abort if the size cap was achieved. Size - // is the total size, including the useful cached data (hash -> blob), the - // cache item metadata, as well as external children mappings. - size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize) - if node.children != nil { - size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2)) + // Iterate to the next flush item, or abort if the size cap was achieved. Size + // is the total size, including the useful cached data (hash -> blob), the + // cache item metadata, as well as external children mappings. + size -= common.StorageSize(common.HashLength + int(node.size) + cachedNodeSize) + if node.children != nil { + size -= common.StorageSize(cachedNodeChildrenSize + len(node.children)*(common.HashLength+2)) + } + oldest = node.flushNext } - oldest = node.flushNext + return nil + }() + if err != nil { + return err } // Flush out any remainder data from the last batch if err := batch.Write(); err != nil { @@ -722,7 +732,9 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H batch.Reset() } // Move the trie itself into the batch, flushing if enough data is accumulated + db.lock.RLock() nodes, storage := len(db.dirties), db.dirtiesSize + db.lock.RUnlock() uncacher := &cleaner{db} if err := db.commit(node, batch, uncacher, callback); err != nil { @@ -766,10 +778,14 @@ func (db *Database) Commit(node common.Hash, report bool, callback func(common.H // commit is the private locked version of Commit. func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner, callback func(common.Hash)) error { // If the node does not exist, it's a previously committed node + db.lock.RLock() node, ok := db.dirties[hash] if !ok { + db.lock.RUnlock() return nil } + db.lock.RUnlock() + var err error node.forChilds(func(child common.Hash) { if err == nil {