From ff2f72752c6cff1166161928a768bc61fdeb989f Mon Sep 17 00:00:00 2001 From: galaio <12880651+galaio@users.noreply.github.com> Date: Thu, 8 Aug 2024 10:29:08 +0800 Subject: [PATCH] txdag: opt read txdag file and validation logic; (#26) * txdag: support new txdep resolve method; pevm: avoid read txdag file when generating; * pevm: support read txdag file in const size; * txdag: reduce mem alloc and async resolve tx dependency; --------- Co-authored-by: galaio --- core/blockchain.go | 129 ++++++++++++++++++++++++------- core/blockchain_test.go | 19 +++-- core/parallel_state_processor.go | 8 +- core/state/statedb.go | 3 +- core/types/dag.go | 46 +++++++++-- core/types/dag_test.go | 2 +- core/types/mvstates.go | 9 ++- eth/backend.go | 2 +- tests/block_test_util.go | 2 +- 9 files changed, 168 insertions(+), 52 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 4b6cddcaab..2775c6f1f8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -306,7 +306,7 @@ type BlockChain struct { parallelExecution bool enableTxDAG bool txDAGWriteCh chan TxDAGOutputItem - txDAGMapping map[uint64]types.TxDAG + txDAGReader *TxDAGFileReader } // NewBlockChain returns a fully initialised block chain using information @@ -1063,6 +1063,9 @@ func (bc *BlockChain) stopWithoutSaving() { // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { + if bc.txDAGReader != nil { + bc.txDAGReader.Close() + } bc.stopWithoutSaving() // Ensure that the entirety of the state snapshot is journaled to disk. @@ -2822,19 +2825,27 @@ func (bc *BlockChain) TxDAGEnabled() bool { return bc.enableTxDAG } -func (bc *BlockChain) SetupTxDAGGeneration(output string) { +func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { log.Info("node enable TxDAG feature", "output", output) bc.enableTxDAG = true if len(output) == 0 { return } // read TxDAG file, and cache in mem - var err error - bc.txDAGMapping, err = readTxDAGMappingFromFile(output) - if err != nil { - log.Error("read TxDAG err", "err", err) + if readFile { + var err error + bc.txDAGReader, err = NewTxDAGFileReader(output) + if err != nil { + log.Error("read TxDAG err", "err", err) + } + // startup with latest block + curHeader := bc.CurrentHeader() + if curHeader != nil { + bc.txDAGReader.TxDAG(curHeader.Number.Uint64()) + } + log.Info("load TxDAG from file", "output", output, "latest", bc.txDAGReader.Latest()) + return } - log.Info("load TxDAG from file", "output", output, "count", len(bc.txDAGMapping)) // write handler go func() { @@ -2878,34 +2889,98 @@ func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error { return err } -// TODO(galaio): support load with segments, every segment 100000 blocks? -func readTxDAGMappingFromFile(output string) (map[uint64]types.TxDAG, error) { +const TxDAGCacheSize = 200000 + +type TxDAGFileReader struct { + file *os.File + scanner *bufio.Scanner + cache map[uint64]types.TxDAG + latest uint64 + lock sync.RWMutex +} + +func NewTxDAGFileReader(output string) (*TxDAGFileReader, error) { file, err := os.Open(output) if err != nil { return nil, err } - defer file.Close() - - mapping := make(map[uint64]types.TxDAG) scanner := bufio.NewScanner(file) - for scanner.Scan() { - tokens := strings.Split(scanner.Text(), ",") - if len(tokens) != 2 { - return nil, errors.New("txDAG output contain wrong size") - } - num, err := strconv.Atoi(tokens[0]) - if err != nil { - return nil, err + return &TxDAGFileReader{ + file: file, + scanner: scanner, + }, nil +} + +func (t *TxDAGFileReader) Close() { + t.lock.Lock() + defer t.lock.Unlock() + t.closeFile() +} + +func (t *TxDAGFileReader) closeFile() { + if t.scanner != nil { + t.scanner = nil + } + if t.file != nil { + t.file.Close() + t.file = nil + } +} + +func (t *TxDAGFileReader) Latest() uint64 { + t.lock.RLock() + defer t.lock.RUnlock() + return t.latest +} + +func (t *TxDAGFileReader) TxDAG(expect uint64) types.TxDAG { + t.lock.Lock() + defer t.lock.Unlock() + + if t.cache != nil && t.latest >= expect { + return t.cache[expect] + } + + t.cache = make(map[uint64]types.TxDAG, TxDAGCacheSize) + counter := 0 + for t.scanner != nil && t.scanner.Scan() { + if counter > TxDAGCacheSize { + break } - enc, err := hex.DecodeString(tokens[1]) + num, dag, err := readTxDAGItemFromLine(t.scanner.Text()) if err != nil { - return nil, err + log.Error("query TxDAG error found and read aborted", "err", err) + t.closeFile() + break } - txDAG, err := types.DecodeTxDAG(enc) - if err != nil { - return nil, err + // skip lower blocks + if expect > num { + continue } - mapping[uint64(num)] = txDAG + t.cache[num] = dag + t.latest = num + counter++ + } + + return t.cache[expect] +} + +func readTxDAGItemFromLine(line string) (uint64, types.TxDAG, error) { + tokens := strings.Split(line, ",") + if len(tokens) != 2 { + return 0, nil, errors.New("txDAG output contain wrong size") + } + num, err := strconv.Atoi(tokens[0]) + if err != nil { + return 0, nil, err + } + enc, err := hex.DecodeString(tokens[1]) + if err != nil { + return 0, nil, err + } + txDAG, err := types.DecodeTxDAG(enc) + if err != nil { + return 0, nil, err } - return mapping, nil + return uint64(num), txDAG, nil } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index ead47a6060..37c15cc768 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4740,21 +4740,28 @@ func TestTxDAGFile_ReadWrite(t *testing.T) { except2 := map[uint64]types.TxDAG{ 3: types.NewEmptyTxDAG(), 4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag), + 5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag), } writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) require.NoError(t, err) for num, dag := range except2 { + if num == 5 { + writeFile.WriteString("num,txdag\n") + continue + } require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag})) } writeFile.Close() - actual, err := readTxDAGMappingFromFile(path) + reader, err := NewTxDAGFileReader(path) require.NoError(t, err) - for num, dag := range except { - require.Equal(t, dag, actual[num]) - } - for num, dag := range except2 { - require.Equal(t, dag, actual[num]) + for i := 0; i < 5; i++ { + num := uint64(i) + if except[num] != nil { + require.Equal(t, except[num], reader.TxDAG(num)) + continue + } + require.Equal(t, except2[num], reader.TxDAG(num)) } } diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 26f0637c37..56172e578f 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -736,8 +736,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat if p.bc.enableTxDAG { // TODO(galaio): load TxDAG from block // or load cache txDAG from file - if txDAG == nil && len(p.bc.txDAGMapping) > 0 { - txDAG = p.bc.txDAGMapping[block.NumberU64()] + if txDAG == nil && p.bc.txDAGReader != nil { + txDAG = p.bc.txDAGReader.TxDAG(block.NumberU64()) } if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil { log.Warn("pevm cannot apply wrong txdag", @@ -763,8 +763,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat // find the latestDepTx from TxDAG or latestExcludedTx latestDepTx := -1 - if txDAG != nil && txDAG.TxDep(i).Count() > 0 { - latestDepTx = int(txDAG.TxDep(i).TxIndexes[txDAG.TxDep(i).Count()-1]) + if dep := types.TxDependency(txDAG, i); len(dep) > 0 { + latestDepTx = int(dep[len(dep)-1]) } if latestDepTx < latestExcludedTx { latestDepTx = latestExcludedTx diff --git a/core/state/statedb.go b/core/state/statedb.go index e48473467a..63e9870640 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -2493,7 +2493,6 @@ func (s *StateDB) FinaliseRWSet() error { // reset stateDB s.rwSet = nil - s.stat = nil return s.mvStates.FulfillRWSet(rwSet, stat) } @@ -2567,7 +2566,7 @@ func (s *StateDB) RecordSystemTxRWSet(index int) { } s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{ TxIndex: index, - }).WithExcludedTxFlag(), types.NewExeStat(index).WithSerialFlag()) + }).WithExcludedTxFlag(), types.NewExeStat(index).WithExcludedTxFlag()) } // copySet returns a deep-copied set. diff --git a/core/types/dag.go b/core/types/dag.go index 76d543aff6..fb6e374bd2 100644 --- a/core/types/dag.go +++ b/core/types/dag.go @@ -25,6 +25,7 @@ var ( // ExcludedTxFlag indicates that the tx is excluded from TxDAG, user should execute them in sequence. // These excluded transactions should be consecutive in the head or tail. ExcludedTxFlag uint8 = 0x02 + TxDepFlagMask = NonDependentRelFlag | ExcludedTxFlag ) type TxDAG interface { @@ -110,10 +111,37 @@ func ValidatePlainTxDAG(d TxDAG, txCnt int) error { return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i) } } + if dep.Flags != nil && *dep.Flags & ^TxDepFlagMask > 0 { + return fmt.Errorf("PlainTxDAG contains unknown flags, flags: %v", *dep.Flags) + } } return nil } +func TxDependency(d TxDAG, i int) []uint64 { + if d == nil || i < 0 || i >= d.TxCount() { + return []uint64{} + } + dep := d.TxDep(i) + if dep.CheckFlag(ExcludedTxFlag) { + txs := make([]uint64, 0, i) + for j := 0; j < i; j++ { + txs = append(txs, uint64(j)) + } + return txs + } + if dep.CheckFlag(NonDependentRelFlag) { + txs := make([]uint64, 0, d.TxCount()-dep.Count()) + for j := 0; j < i; j++ { + if !dep.Exist(j) && j != i { + txs = append(txs, uint64(j)) + } + } + return txs + } + return dep.TxIndexes +} + // EmptyTxDAG indicate that execute txs in sequence // It means no transactions or need timely distribute transaction fees // it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence @@ -438,7 +466,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { totalTxMeter.Mark(int64(txCount)) for i, path := range paths { - if stats[i].mustSerial { + if stats[i].excludedTx { continue } if len(path) <= 1 { @@ -499,7 +527,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { sPath []int ) for i, stat := range stats { - if stat.mustSerial { + if stat.excludedTx { continue } sPath = append(sPath, i) @@ -512,13 +540,15 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) { // travelTxDAGTargetPath will print target execution path func travelTxDAGTargetPath(deps []TxDep, from uint64) []uint64 { - queue := make([]uint64, 0, len(deps)) - path := make([]uint64, 0, len(deps)) + var ( + queue []uint64 + path []uint64 + ) queue = append(queue, from) path = append(path, from) for len(queue) > 0 { - next := make([]uint64, 0, len(deps)) + var next []uint64 for _, i := range queue { for _, dep := range deps[i].TxIndexes { if !slices.Contains(path, dep) { @@ -542,7 +572,7 @@ type ExeStat struct { costTime time.Duration // some flags - mustSerial bool + excludedTx bool } func NewExeStat(txIndex int) *ExeStat { @@ -561,8 +591,8 @@ func (s *ExeStat) Done() *ExeStat { return s } -func (s *ExeStat) WithSerialFlag() *ExeStat { - s.mustSerial = true +func (s *ExeStat) WithExcludedTxFlag() *ExeStat { + s.excludedTx = true return s } diff --git a/core/types/dag_test.go b/core/types/dag_test.go index 499b914340..6edb10cc3b 100644 --- a/core/types/dag_test.go +++ b/core/types/dag_test.go @@ -41,7 +41,7 @@ func TestEvaluateTxDAG(t *testing.T) { stats[i].costTime = time.Duration(i) txDep := dag.TxDep(i) if txDep.CheckFlag(NonDependentRelFlag) { - stats[i].WithSerialFlag() + stats[i].WithExcludedTxFlag() } } EvaluateTxDAGPerformance(dag, stats) diff --git a/core/types/mvstates.go b/core/types/mvstates.go index 9816c6ecfa..c789dbeabc 100644 --- a/core/types/mvstates.go +++ b/core/types/mvstates.go @@ -362,8 +362,13 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error { checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet) } } - s.resolveDepsCache(index, rwSet) s.rwSets[index] = rwSet + // async resolve dependency + go func() { + s.lock.Lock() + defer s.lock.Unlock() + s.resolveDepsCache(index, rwSet) + }() return nil } @@ -473,7 +478,7 @@ func (s *MVStates) ResolveTxDAG(txCnt int, gasFeeReceivers []common.Address) (Tx txDAG.TxDeps[i].TxIndexes = deps continue } - // if tx deps larger than half of txs, then convert to relation1 + // if tx deps larger than half of txs, then convert with NonDependentRelFlag txDAG.TxDeps[i].SetFlag(NonDependentRelFlag) for j := uint64(0); j < uint64(txCnt); j++ { if !slices.Contains(deps, j) && j != uint64(i) { diff --git a/eth/backend.go b/eth/backend.go index bfb9a8c86c..9ca52bfa57 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -270,7 +270,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } if config.EnableParallelTxDAG { - eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile) + eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile, config.ParallelTxMode) } if chainConfig := eth.blockchain.Config(); chainConfig.Optimism != nil { // config.Genesis.Config.ChainID cannot be used because it's based on CLI flags only, thus default to mainnet L1 config.NetworkId = chainConfig.ChainID.Uint64() // optimism defaults eth network ID to chain ID diff --git a/tests/block_test_util.go b/tests/block_test_util.go index 837d257ef0..609f8c98e9 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -159,7 +159,7 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, po } defer chain.Stop() if len(dagFile) > 0 { - chain.SetupTxDAGGeneration(dagFile) + chain.SetupTxDAGGeneration(dagFile, enableParallel) } validBlocks, err := t.insertBlocks(chain) if err != nil {