Skip to content

Commit

Permalink
Feat: Parallel Transaction Execution Implementation
Browse files Browse the repository at this point in the history
This PR implement the Parallel EVM engine

Co-authored-by: setunapo
Co-authored-by: sunny2022da
Co-authored-by: galaio
Co-authored-by: andyzhang2023
  • Loading branch information
sunny2022da committed Sep 25, 2024
1 parent e3170ae commit ecaef53
Show file tree
Hide file tree
Showing 62 changed files with 5,032 additions and 291 deletions.
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ var (
utils.RollupComputePendingBlock,
utils.RollupHaltOnIncompatibleProtocolVersionFlag,
utils.RollupSuperchainUpgradesFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
configFileFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
Expand Down
34 changes: 34 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"net/http"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -1093,6 +1094,18 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.MetricsCategory,
}

ParallelTxFlag = &cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
Category: flags.VMCategory,
}

ParallelTxNumFlag = &cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -1983,6 +1996,27 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.EnablePreimageRecording = ctx.Bool(VMEnableDebugFlag.Name)
}

if ctx.IsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.Bool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.IsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.Int(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
}

if ctx.IsSet(VMOpcodeOptimizeFlag.Name) {
cfg.EnableOpcodeOptimizing = ctx.Bool(VMOpcodeOptimizeFlag.Name)
if cfg.EnableOpcodeOptimizing {
Expand Down
6 changes: 3 additions & 3 deletions consensus/clique/clique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestReimportMirroredState(t *testing.T) {
copy(genspec.ExtraData[extraVanity:], addr[:])

// Generate a batch of blocks, each properly signed
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, genspec, nil, engine, vm.Config{}, nil, nil)
chain, _ := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, genspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chain.Stop()

_, blocks, _ := core.GenerateChainWithGenesis(genspec, engine, 3, func(i int, block *core.BlockGen) {
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestReimportMirroredState(t *testing.T) {
}
// Insert the first two blocks and make sure the chain is valid
db = rawdb.NewMemoryDatabase()
chain, _ = core.NewBlockChain(db, nil, genspec, nil, engine, vm.Config{}, nil, nil)
chain, _ = core.NewBlockChain(db, nil, genspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chain.Stop()

if _, err := chain.InsertChain(blocks[:2]); err != nil {
Expand All @@ -100,7 +100,7 @@ func TestReimportMirroredState(t *testing.T) {
// Simulate a crash by creating a new chain on top of the database, without
// flushing the dirty states out. Insert the last block, triggering a sidechain
// reimport.
chain, _ = core.NewBlockChain(db, nil, genspec, nil, engine, vm.Config{}, nil, nil)
chain, _ = core.NewBlockChain(db, nil, genspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chain.Stop()

if _, err := chain.InsertChain(blocks[2:]); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consensus/clique/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (tt *cliqueTest) run(t *testing.T) {
batches[len(batches)-1] = append(batches[len(batches)-1], block)
}
// Pass all the headers through clique and ensure tallying succeeds
chain, err := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, genesis, nil, engine, vm.Config{}, nil, nil)
chain, err := core.NewBlockChain(rawdb.NewMemoryDatabase(), nil, genesis, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("failed to create test chain: %v", err)
}
Expand Down
10 changes: 8 additions & 2 deletions consensus/ethash/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,16 @@ func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, hea
}
// Finalize block
ethash.Finalize(chain, header, state, txs, uncles, nil)

/*
js, _ := header.MarshalJSON()
fmt.Printf("== Dav -- ethash FinalizeAndAssemble, before Root update, Root %s, header json: %s\n", header.Root, js)
*/
// Assign the final state root to header.
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number))

/*
js, _ = header.MarshalJSON()
fmt.Printf(" == Dav -- ethash FinalizeAndAssemble, after Root update, Root %s, header json: %s\n", header.Root, js)
*/
// Header seems complete, assemble into a block and return
return types.NewBlock(header, txs, uncles, receipts, trie.NewStackTrie(nil)), nil
}
Expand Down
4 changes: 3 additions & 1 deletion core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {

// Time the insertion of the new chain.
// State and blocks are stored in the same DB.
chainman, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
chainman, _ := NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chainman.Stop()
b.ReportAllocs()
b.ResetTimer()
Expand Down Expand Up @@ -312,7 +312,9 @@ func benchReadChain(b *testing.B, full bool, count uint64) {
if err != nil {
b.Fatalf("error opening database at %v: %v", dir, err)
}

chain, err := NewBlockChain(db, &cacheConfig, genesis, nil, ethash.NewFaker(), vm.Config{}, nil, nil)

if err != nil {
b.Fatalf("error creating chain: %v", err)
}
Expand Down
7 changes: 4 additions & 3 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
func() error {
// Validate the state root against the received state root and throw
// an error if they don't match.
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
}
// @TODO shall we disable it?
//if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
// return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
//}
return nil
},
}
Expand Down
4 changes: 2 additions & 2 deletions core/block_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func testHeaderVerification(t *testing.T, scheme string) {
headers[i] = block.Header()
}
// Run the header checker for blocks one-by-one, checking for both valid and invalid nonces
chain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
chain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chain.Stop()

for i := 0; i < len(blocks); i++ {
Expand Down Expand Up @@ -163,7 +163,7 @@ func testHeaderVerificationForMerging(t *testing.T, isClique bool) {
t.Logf("Post-merge header: %d", block.NumberU64())
}
// Run the header checker for blocks one-by-one, checking for both valid and invalid nonces
chain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), nil, gspec, nil, engine, vm.Config{}, nil, nil)
chain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), nil, gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
defer chain.Stop()

// Verify the blocks before the merging
Expand Down
42 changes: 33 additions & 9 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ var (
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")

ParallelTxMode = false // parallel transaction execution
)

const (
Expand Down Expand Up @@ -288,12 +290,13 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
processor Processor // Block transaction processor interface
forker *ForkChoice
vmConfig vm.Config
parallelExecution bool
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -358,7 +361,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.validator = NewBlockValidator(chainConfig, bc, engine)
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
bc.processor = NewStateProcessor(chainConfig, bc, engine)

err := proofKeeper.Start(bc, db)
if err != nil {
Expand Down Expand Up @@ -512,6 +514,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
}

if vmConfig.EnableParallelExec {
bc.EnableParallelProcessor(vmConfig.ParallelTxNum)
} else {
bc.processor = NewStateProcessor(chainConfig, bc, engine)
}

// Start future block processor.
bc.wg.Add(1)
go bc.updateFutureBlocks()
Expand Down Expand Up @@ -1556,6 +1564,7 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types
// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
// This function expects the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {

if err := bc.writeBlockWithState(block, receipts, state); err != nil {
return NonStatTy, err
}
Expand Down Expand Up @@ -1597,6 +1606,7 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}

return status, nil
}

Expand Down Expand Up @@ -1738,7 +1748,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
return it.index, err
}
lastCanon = block

block, err = it.next()
}
// Falls through to the block import
Expand Down Expand Up @@ -1878,7 +1887,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// If we have a followup block, run that against the current state to pre-cache
// transactions and probabilistically some of the account/storage trie nodes.
if !bc.cacheConfig.TrieCleanNoPrefetch {
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !bc.cacheConfig.TrieCleanNoPrefetch && !bc.parallelExecution {
if followup, err := it.peek(); followup != nil && err == nil {
throwaway, _ := state.New(parent.Root, bc.stateCache, bc.snaps)

Expand Down Expand Up @@ -1907,6 +1917,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
ptime := time.Since(pstart)

vstart := time.Now()

if err := bc.validator.ValidateState(block, statedb, receipts, usedGas); err != nil {
bc.reportBlock(block, receipts, err)
followupInterrupt.Store(true)
Expand Down Expand Up @@ -2598,6 +2609,19 @@ func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
return time.Duration(bc.flushInterval.Load())
}

func (bc *BlockChain) EnableParallelProcessor(parallelNum int) (*BlockChain, error) {
/*
if bc.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return bc, nil
}
*/
bc.parallelExecution = true
bc.processor = NewParallelStateProcessor(bc.Config(), bc, bc.engine, parallelNum)
return bc, nil
}

func (bc *BlockChain) NoTries() bool {
return bc.stateCache.NoTries()
}
Expand Down
12 changes: 8 additions & 4 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,13 +1794,17 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
config.SnapshotLimit = 256
config.SnapshotWait = true
}
chain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{}, nil, nil)
chain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}

// fmt.Printf("Dav -- test -- testRepairWithScheme -- chain after NewBlockChain processor: %v, parallel: %v, vmConfig: %v\n", chain.processor, chain.parallelExecution, chain.vmConfig)

// If sidechain blocks are needed, make a light chain and import it
var sideblocks types.Blocks
if tt.sidechainBlocks > 0 {
//fmt.Printf("Dav -- test -- testRepairWithScheme -- tt.sidechainBlocks: %d\n", tt.sidechainBlocks)
sideblocks, _ = GenerateChain(gspec.Config, gspec.ToBlock(), engine, rawdb.NewMemoryDatabase(), tt.sidechainBlocks, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{0x01})
})
Expand Down Expand Up @@ -1855,7 +1859,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
defer db.Close()

newChain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{}, nil, nil)
newChain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down Expand Up @@ -1927,7 +1931,7 @@ func testIssue23496(t *testing.T, scheme string) {
}
engine = ethash.NewFullFaker()
)
chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil)
chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
Expand Down Expand Up @@ -1977,7 +1981,7 @@ func testIssue23496(t *testing.T, scheme string) {
}
defer db.Close()

chain, err = NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{}, nil, nil)
chain, err = NewBlockChain(db, DefaultCacheConfigWithScheme(scheme), gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("Failed to recreate chain: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
config.SnapshotLimit = 256
config.SnapshotWait = true
}
chain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{}, nil, nil)
chain, err := NewBlockChain(db, config, gspec, nil, engine, vm.Config{EnableParallelExec: true, ParallelTxNum: 1}, nil, nil)
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
Expand Down
Loading

0 comments on commit ecaef53

Please sign in to comment.