From 2fc31ce2647211d47391f5844c3bf17a647129d8 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Wed, 20 Nov 2024 16:59:58 +0800 Subject: [PATCH] add dag support --- core/pevm_processor.go | 2 +- core/pevm_scheduler.go | 45 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/core/pevm_processor.go b/core/pevm_processor.go index f21dfb419..0207edca7 100644 --- a/core/pevm_processor.go +++ b/core/pevm_processor.go @@ -281,7 +281,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg // log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex) // return p.confirmTxResult(statedb, gp, pr) //}, p.unorderedMerge) - err, txIndex := newPEVMScheduler(p.allTxReqs).Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) { + err, txIndex := newPEVMScheduler(p.allTxReqs, txDAG).Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) { defer func(t0 time.Time) { atomic.AddInt64(&executeDurations, time.Since(t0).Nanoseconds()) }(time.Now()) diff --git a/core/pevm_scheduler.go b/core/pevm_scheduler.go index 013d15fe2..6dc70cdef 100644 --- a/core/pevm_scheduler.go +++ b/core/pevm_scheduler.go @@ -21,10 +21,10 @@ type PEVMScheduler struct { all []*PEVMJob } -func newPEVMScheduler(allTx []*PEVMTxRequest) *PEVMScheduler { +func newPEVMScheduler(allTx []*PEVMTxRequest, txdag types.TxDAG) *PEVMScheduler { all := make([]*PEVMJob, len(allTx)) for i, tx := range allTx { - all[i] = &PEVMJob{tx: tx} + all[i] = &PEVMJob{tx: tx, dag: txdag} } return &PEVMScheduler{all: all} } @@ -91,7 +91,7 @@ func (ps *PEVMScheduler) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm for merged < int32(len(ps.all))-1 && failed == nil { for jIdx := merged + 1; jIdx < allNum; jIdx++ { job := ps.all[jIdx] - job.execute(execute) + job.execute(execute, ps.all) merge() } } @@ -120,7 +120,32 @@ func (job *PEVMJob) unlock() { func (job *PEVMJob) dependencies() (txs []int, all bool) { //return all dependencies of this job - return nil, false + if job.dag == nil { + return nil, false + } + dep := job.dag.TxDep(job.tx.txIndex) + switch true { + case dep != nil && dep.CheckFlag(types.ExcludedTxFlag), + dep != nil && dep.CheckFlag(types.NonDependentRelFlag): + // excluted tx, need to wait all txs to be merged + return nil, true + + case dep == nil || len(dep.TxIndexes) == 0: + // dependent on none + return nil, false + + case dep != nil && len(dep.TxIndexes) > 0: + // dependent on others + // findout the correct level that the tx should be put + depIndexes := make([]int, len(dep.TxIndexes)) + for i, txIndex := range dep.TxIndexes { + depIndexes[i] = int(txIndex) + } + return depIndexes, false + + default: + panic("unexpected case") + } } func (job *PEVMJob) readyToExecute(all []*PEVMJob) bool { @@ -128,9 +153,13 @@ func (job *PEVMJob) readyToExecute(all []*PEVMJob) bool { deps, execluded := job.dependencies() // the execluted one if execluded { - // @TODO // should wait all txs[:jIdx-1] to be merged - return false + for i := job.tx.txIndex - 1; i >= 0; i-- { + if !all[i].merged { + return false + } + } + return true } else if len(deps) != 0 && !allMerged(all, deps) { // should wait all dependiences to be merged return false @@ -138,12 +167,12 @@ func (job *PEVMJob) readyToExecute(all []*PEVMJob) bool { return true } -func (job *PEVMJob) execute(execute func(*PEVMTxRequest) *PEVMTxResult) bool { +func (job *PEVMJob) execute(execute func(*PEVMTxRequest) *PEVMTxResult, all []*PEVMJob) bool { if !job.lock() { return false } defer job.unlock() - if !job.executed && job.readyToExecute(nil) { + if !job.executed && job.readyToExecute(all) { // execute the job job.res = execute(job.tx) job.executed = true