Skip to content

Commit

Permalink
add dag support
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 committed Nov 20, 2024
1 parent 340cacd commit 2fc31ce
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/pevm_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (p *PEVMProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// log.Debug("pevm confirm", "txIndex", pr.txReq.txIndex)
// return p.confirmTxResult(statedb, gp, pr)
//}, p.unorderedMerge)
err, txIndex := newPEVMScheduler(p.allTxReqs).Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) {
err, txIndex := newPEVMScheduler(p.allTxReqs, txDAG).Run(func(pr *PEVMTxRequest) (res *PEVMTxResult) {
defer func(t0 time.Time) {
atomic.AddInt64(&executeDurations, time.Since(t0).Nanoseconds())
}(time.Now())
Expand Down
45 changes: 37 additions & 8 deletions core/pevm_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type PEVMScheduler struct {
all []*PEVMJob
}

func newPEVMScheduler(allTx []*PEVMTxRequest) *PEVMScheduler {
func newPEVMScheduler(allTx []*PEVMTxRequest, txdag types.TxDAG) *PEVMScheduler {
all := make([]*PEVMJob, len(allTx))
for i, tx := range allTx {
all[i] = &PEVMJob{tx: tx}
all[i] = &PEVMJob{tx: tx, dag: txdag}
}
return &PEVMScheduler{all: all}
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (ps *PEVMScheduler) Run(execute func(*PEVMTxRequest) *PEVMTxResult, confirm
for merged < int32(len(ps.all))-1 && failed == nil {
for jIdx := merged + 1; jIdx < allNum; jIdx++ {
job := ps.all[jIdx]
job.execute(execute)
job.execute(execute, ps.all)
merge()
}
}
Expand Down Expand Up @@ -120,30 +120,59 @@ func (job *PEVMJob) unlock() {

func (job *PEVMJob) dependencies() (txs []int, all bool) {
//return all dependencies of this job
return nil, false
if job.dag == nil {
return nil, false
}
dep := job.dag.TxDep(job.tx.txIndex)
switch true {
case dep != nil && dep.CheckFlag(types.ExcludedTxFlag),
dep != nil && dep.CheckFlag(types.NonDependentRelFlag):
// excluted tx, need to wait all txs to be merged
return nil, true

case dep == nil || len(dep.TxIndexes) == 0:
// dependent on none
return nil, false

case dep != nil && len(dep.TxIndexes) > 0:
// dependent on others
// findout the correct level that the tx should be put
depIndexes := make([]int, len(dep.TxIndexes))
for i, txIndex := range dep.TxIndexes {
depIndexes[i] = int(txIndex)
}
return depIndexes, false

default:
panic("unexpected case")
}
}

func (job *PEVMJob) readyToExecute(all []*PEVMJob) bool {
// all dependiences is merged
deps, execluded := job.dependencies()
// the execluted one
if execluded {
// @TODO
// should wait all txs[:jIdx-1] to be merged
return false
for i := job.tx.txIndex - 1; i >= 0; i-- {
if !all[i].merged {
return false
}
}
return true
} else if len(deps) != 0 && !allMerged(all, deps) {
// should wait all dependiences to be merged
return false
}
return true
}

func (job *PEVMJob) execute(execute func(*PEVMTxRequest) *PEVMTxResult) bool {
func (job *PEVMJob) execute(execute func(*PEVMTxRequest) *PEVMTxResult, all []*PEVMJob) bool {
if !job.lock() {
return false
}
defer job.unlock()
if !job.executed && job.readyToExecute(nil) {
if !job.executed && job.readyToExecute(all) {
// execute the job
job.res = execute(job.tx)
job.executed = true
Expand Down

0 comments on commit 2fc31ce

Please sign in to comment.