diff --git a/core/blockchain.go b/core/blockchain.go index 8b8f7bfe4..ad3c90457 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1896,6 +1896,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) defer func() { DebugInnerExecutionDuration = 0 }() + + if bc.serialProcessor == nil { + bc.serialProcessor = bc.processor + } + for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() { DebugInnerExecutionDuration = 0 // If the chain is terminating, stop processing blocks @@ -1961,6 +1966,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) err error ) + blockProcessedInParallel := false // skip block process if we already have the state, receipts and logs from mining work if !(receiptExist && logExist && stateExist) { // Retrieve the parent block and it's state to execute on top @@ -2010,8 +2016,26 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if useSerialProcessor { receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = false } else { receipts, logs, usedGas, err = bc.processor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = true + if err != nil { + // parallel processing fail , fallback to serial with new statDB. + log.Warn("ParallelEVM fallback to serial process", "error", err.Error()) + execErr := err + statedb, err = bc.reGenerateStateForFallBack(parent.Root, block.Root(), statedb) + if err != nil { + // Can not get new statedb for serial run, report the process error. + bc.reportBlock(block, receipts, execErr) + followupInterrupt.Store(true) + return it.index, err + } + statedb.StartPrefetcher("chain") + activeState = statedb + receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + blockProcessedInParallel = false + } } if err != nil { bc.reportBlock(block, receipts, err) @@ -2023,9 +2047,43 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) vstart := time.Now() if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { - bc.reportBlock(block, receipts, err) - followupInterrupt.Store(true) - return it.index, err + if blockProcessedInParallel { + // invalid parallel execution, try serial + log.Warn("ParallelEVM fallback to serial process after ValidateState", "error", err.Error()) + parent := it.previous() + if parent == nil { + parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) + } + + validateErr := err + statedb, err = bc.reGenerateStateForFallBack(parent.Root, block.Root(), statedb) + if err != nil { + // can not get new statedb for serial run, report the validate error. + bc.reportBlock(block, receipts, validateErr) + followupInterrupt.Store(true) + return it.index, err + } + statedb.StartPrefetcher("chain") + activeState = statedb + receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig) + if err != nil { + // serial process with process error. + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } + blockProcessedInParallel = false + if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil { + // serial process with validation error. + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } + } else { + bc.reportBlock(block, receipts, err) + followupInterrupt.Store(true) + return it.index, err + } } vtime := time.Since(vstart) @@ -2921,6 +2979,17 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { } +func (bc *BlockChain) reGenerateStateForFallBack(parentRoot common.Hash, blockRoot common.Hash, oldDB *state.StateDB) (*state.StateDB, error) { + oldDB.StopPrefetcher() + statedb, err := state.New(parentRoot, bc.stateCache, bc.snaps) + if err != nil { + return nil, err + } + + statedb.SetExpectedStateRoot(blockRoot) + return statedb, nil +} + type TxDAGOutputItem struct { blockNumber uint64 txDAG types.TxDAG