Skip to content

Commit

Permalink
txdag: opt read txdag file and validation logic; (bnb-chain#26)
Browse files Browse the repository at this point in the history
* txdag: support new txdep resolve method;
pevm: avoid read txdag file when generating;

* pevm: support read txdag file in const size;

* txdag: reduce mem alloc and async resolve tx dependency;

---------

Co-authored-by: galaio <[email protected]>
  • Loading branch information
2 people authored and sunny2022da committed Aug 13, 2024
1 parent 9fc533b commit ff2f727
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 52 deletions.
129 changes: 102 additions & 27 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ type BlockChain struct {
parallelExecution bool
enableTxDAG bool
txDAGWriteCh chan TxDAGOutputItem
txDAGMapping map[uint64]types.TxDAG
txDAGReader *TxDAGFileReader
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1063,6 +1063,9 @@ func (bc *BlockChain) stopWithoutSaving() {
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
if bc.txDAGReader != nil {
bc.txDAGReader.Close()
}
bc.stopWithoutSaving()

// Ensure that the entirety of the state snapshot is journaled to disk.
Expand Down Expand Up @@ -2822,19 +2825,27 @@ func (bc *BlockChain) TxDAGEnabled() bool {
return bc.enableTxDAG
}

func (bc *BlockChain) SetupTxDAGGeneration(output string) {
func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) {
log.Info("node enable TxDAG feature", "output", output)
bc.enableTxDAG = true
if len(output) == 0 {
return
}
// read TxDAG file, and cache in mem
var err error
bc.txDAGMapping, err = readTxDAGMappingFromFile(output)
if err != nil {
log.Error("read TxDAG err", "err", err)
if readFile {
var err error
bc.txDAGReader, err = NewTxDAGFileReader(output)
if err != nil {
log.Error("read TxDAG err", "err", err)
}
// startup with latest block
curHeader := bc.CurrentHeader()
if curHeader != nil {
bc.txDAGReader.TxDAG(curHeader.Number.Uint64())
}
log.Info("load TxDAG from file", "output", output, "latest", bc.txDAGReader.Latest())
return
}
log.Info("load TxDAG from file", "output", output, "count", len(bc.txDAGMapping))

// write handler
go func() {
Expand Down Expand Up @@ -2878,34 +2889,98 @@ func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error {
return err
}

// TODO(galaio): support load with segments, every segment 100000 blocks?
func readTxDAGMappingFromFile(output string) (map[uint64]types.TxDAG, error) {
const TxDAGCacheSize = 200000

type TxDAGFileReader struct {
file *os.File
scanner *bufio.Scanner
cache map[uint64]types.TxDAG
latest uint64
lock sync.RWMutex
}

func NewTxDAGFileReader(output string) (*TxDAGFileReader, error) {
file, err := os.Open(output)
if err != nil {
return nil, err
}
defer file.Close()

mapping := make(map[uint64]types.TxDAG)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
tokens := strings.Split(scanner.Text(), ",")
if len(tokens) != 2 {
return nil, errors.New("txDAG output contain wrong size")
}
num, err := strconv.Atoi(tokens[0])
if err != nil {
return nil, err
return &TxDAGFileReader{
file: file,
scanner: scanner,
}, nil
}

func (t *TxDAGFileReader) Close() {
t.lock.Lock()
defer t.lock.Unlock()
t.closeFile()
}

func (t *TxDAGFileReader) closeFile() {
if t.scanner != nil {
t.scanner = nil
}
if t.file != nil {
t.file.Close()
t.file = nil
}
}

func (t *TxDAGFileReader) Latest() uint64 {
t.lock.RLock()
defer t.lock.RUnlock()
return t.latest
}

func (t *TxDAGFileReader) TxDAG(expect uint64) types.TxDAG {
t.lock.Lock()
defer t.lock.Unlock()

if t.cache != nil && t.latest >= expect {
return t.cache[expect]
}

t.cache = make(map[uint64]types.TxDAG, TxDAGCacheSize)
counter := 0
for t.scanner != nil && t.scanner.Scan() {
if counter > TxDAGCacheSize {
break
}
enc, err := hex.DecodeString(tokens[1])
num, dag, err := readTxDAGItemFromLine(t.scanner.Text())
if err != nil {
return nil, err
log.Error("query TxDAG error found and read aborted", "err", err)
t.closeFile()
break
}
txDAG, err := types.DecodeTxDAG(enc)
if err != nil {
return nil, err
// skip lower blocks
if expect > num {
continue
}
mapping[uint64(num)] = txDAG
t.cache[num] = dag
t.latest = num
counter++
}

return t.cache[expect]
}

func readTxDAGItemFromLine(line string) (uint64, types.TxDAG, error) {
tokens := strings.Split(line, ",")
if len(tokens) != 2 {
return 0, nil, errors.New("txDAG output contain wrong size")
}
num, err := strconv.Atoi(tokens[0])
if err != nil {
return 0, nil, err
}
enc, err := hex.DecodeString(tokens[1])
if err != nil {
return 0, nil, err
}
txDAG, err := types.DecodeTxDAG(enc)
if err != nil {
return 0, nil, err
}
return mapping, nil
return uint64(num), txDAG, nil
}
19 changes: 13 additions & 6 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4740,21 +4740,28 @@ func TestTxDAGFile_ReadWrite(t *testing.T) {
except2 := map[uint64]types.TxDAG{
3: types.NewEmptyTxDAG(),
4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag),
5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag),
}
writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
require.NoError(t, err)
for num, dag := range except2 {
if num == 5 {
writeFile.WriteString("num,txdag\n")
continue
}
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Close()

actual, err := readTxDAGMappingFromFile(path)
reader, err := NewTxDAGFileReader(path)
require.NoError(t, err)
for num, dag := range except {
require.Equal(t, dag, actual[num])
}
for num, dag := range except2 {
require.Equal(t, dag, actual[num])
for i := 0; i < 5; i++ {
num := uint64(i)
if except[num] != nil {
require.Equal(t, except[num], reader.TxDAG(num))
continue
}
require.Equal(t, except2[num], reader.TxDAG(num))
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
if p.bc.enableTxDAG {
// TODO(galaio): load TxDAG from block
// or load cache txDAG from file
if txDAG == nil && len(p.bc.txDAGMapping) > 0 {
txDAG = p.bc.txDAGMapping[block.NumberU64()]
if txDAG == nil && p.bc.txDAGReader != nil {
txDAG = p.bc.txDAGReader.TxDAG(block.NumberU64())
}
if err := types.ValidateTxDAG(txDAG, len(block.Transactions())); err != nil {
log.Warn("pevm cannot apply wrong txdag",
Expand All @@ -763,8 +763,8 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if txDAG != nil && txDAG.TxDep(i).Count() > 0 {
latestDepTx = int(txDAG.TxDep(i).TxIndexes[txDAG.TxDep(i).Count()-1])
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
Expand Down
3 changes: 1 addition & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2493,7 +2493,6 @@ func (s *StateDB) FinaliseRWSet() error {

// reset stateDB
s.rwSet = nil
s.stat = nil
return s.mvStates.FulfillRWSet(rwSet, stat)
}

Expand Down Expand Up @@ -2567,7 +2566,7 @@ func (s *StateDB) RecordSystemTxRWSet(index int) {
}
s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{
TxIndex: index,
}).WithExcludedTxFlag(), types.NewExeStat(index).WithSerialFlag())
}).WithExcludedTxFlag(), types.NewExeStat(index).WithExcludedTxFlag())
}

// copySet returns a deep-copied set.
Expand Down
46 changes: 38 additions & 8 deletions core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
// ExcludedTxFlag indicates that the tx is excluded from TxDAG, user should execute them in sequence.
// These excluded transactions should be consecutive in the head or tail.
ExcludedTxFlag uint8 = 0x02
TxDepFlagMask = NonDependentRelFlag | ExcludedTxFlag
)

type TxDAG interface {
Expand Down Expand Up @@ -110,10 +111,37 @@ func ValidatePlainTxDAG(d TxDAG, txCnt int) error {
return fmt.Errorf("PlainTxDAG contains unordered dependency, tx: %v", i)
}
}
if dep.Flags != nil && *dep.Flags & ^TxDepFlagMask > 0 {
return fmt.Errorf("PlainTxDAG contains unknown flags, flags: %v", *dep.Flags)
}
}
return nil
}

func TxDependency(d TxDAG, i int) []uint64 {
if d == nil || i < 0 || i >= d.TxCount() {
return []uint64{}
}
dep := d.TxDep(i)
if dep.CheckFlag(ExcludedTxFlag) {
txs := make([]uint64, 0, i)
for j := 0; j < i; j++ {
txs = append(txs, uint64(j))
}
return txs
}
if dep.CheckFlag(NonDependentRelFlag) {
txs := make([]uint64, 0, d.TxCount()-dep.Count())
for j := 0; j < i; j++ {
if !dep.Exist(j) && j != i {
txs = append(txs, uint64(j))
}
}
return txs
}
return dep.TxIndexes
}

// EmptyTxDAG indicate that execute txs in sequence
// It means no transactions or need timely distribute transaction fees
// it only keep partial serial execution when tx cannot delay the distribution or just execute txs in sequence
Expand Down Expand Up @@ -438,7 +466,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) {

totalTxMeter.Mark(int64(txCount))
for i, path := range paths {
if stats[i].mustSerial {
if stats[i].excludedTx {
continue
}
if len(path) <= 1 {
Expand Down Expand Up @@ -499,7 +527,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) {
sPath []int
)
for i, stat := range stats {
if stat.mustSerial {
if stat.excludedTx {
continue
}
sPath = append(sPath, i)
Expand All @@ -512,13 +540,15 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) {

// travelTxDAGTargetPath will print target execution path
func travelTxDAGTargetPath(deps []TxDep, from uint64) []uint64 {
queue := make([]uint64, 0, len(deps))
path := make([]uint64, 0, len(deps))
var (
queue []uint64
path []uint64
)

queue = append(queue, from)
path = append(path, from)
for len(queue) > 0 {
next := make([]uint64, 0, len(deps))
var next []uint64
for _, i := range queue {
for _, dep := range deps[i].TxIndexes {
if !slices.Contains(path, dep) {
Expand All @@ -542,7 +572,7 @@ type ExeStat struct {
costTime time.Duration

// some flags
mustSerial bool
excludedTx bool
}

func NewExeStat(txIndex int) *ExeStat {
Expand All @@ -561,8 +591,8 @@ func (s *ExeStat) Done() *ExeStat {
return s
}

func (s *ExeStat) WithSerialFlag() *ExeStat {
s.mustSerial = true
func (s *ExeStat) WithExcludedTxFlag() *ExeStat {
s.excludedTx = true
return s
}

Expand Down
2 changes: 1 addition & 1 deletion core/types/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestEvaluateTxDAG(t *testing.T) {
stats[i].costTime = time.Duration(i)
txDep := dag.TxDep(i)
if txDep.CheckFlag(NonDependentRelFlag) {
stats[i].WithSerialFlag()
stats[i].WithExcludedTxFlag()
}
}
EvaluateTxDAGPerformance(dag, stats)
Expand Down
9 changes: 7 additions & 2 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,13 @@ func (s *MVStates) FulfillRWSet(rwSet *RWSet, stat *ExeStat) error {
checkRWSetInconsistent(index, k, rwSet.readSet, rwSet.writeSet)
}
}
s.resolveDepsCache(index, rwSet)
s.rwSets[index] = rwSet
// async resolve dependency
go func() {
s.lock.Lock()
defer s.lock.Unlock()
s.resolveDepsCache(index, rwSet)
}()
return nil
}

Expand Down Expand Up @@ -473,7 +478,7 @@ func (s *MVStates) ResolveTxDAG(txCnt int, gasFeeReceivers []common.Address) (Tx
txDAG.TxDeps[i].TxIndexes = deps
continue
}
// if tx deps larger than half of txs, then convert to relation1
// if tx deps larger than half of txs, then convert with NonDependentRelFlag
txDAG.TxDeps[i].SetFlag(NonDependentRelFlag)
for j := uint64(0); j < uint64(txCnt); j++ {
if !slices.Contains(deps, j) && j != uint64(i) {
Expand Down
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
if config.EnableParallelTxDAG {
eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile)
eth.blockchain.SetupTxDAGGeneration(config.ParallelTxDAGFile, config.ParallelTxMode)
}
if chainConfig := eth.blockchain.Config(); chainConfig.Optimism != nil { // config.Genesis.Config.ChainID cannot be used because it's based on CLI flags only, thus default to mainnet L1
config.NetworkId = chainConfig.ChainID.Uint64() // optimism defaults eth network ID to chain ID
Expand Down
2 changes: 1 addition & 1 deletion tests/block_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, tracer vm.EVMLogger, po
}
defer chain.Stop()
if len(dagFile) > 0 {
chain.SetupTxDAGGeneration(dagFile)
chain.SetupTxDAGGeneration(dagFile, enableParallel)
}
validBlocks, err := t.insertBlocks(chain)
if err != nil {
Expand Down

0 comments on commit ff2f727

Please sign in to comment.