Skip to content

Commit

Permalink
txdag: opt txdag logic & clean todos; (bnb-chain#17)
Browse files Browse the repository at this point in the history
txdag: opt rw record flag;

txdag: opt some logic;

Co-authored-by: galaio <[email protected]>
  • Loading branch information
2 people authored and sunny2022da committed Aug 13, 2024
1 parent 5295324 commit b2e4403
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 259 deletions.
2 changes: 1 addition & 1 deletion core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (ch resetObjectChange) revert(dber StateDBer) {

if !ch.prevdestruct {
s.snapParallelLock.Lock()
s.deleteStateObjectsDestruct(ch.prev.address)
s.removeStateObjectsDestruct(ch.prev.address)
s.snapParallelLock.Unlock()
}
if ch.prevAccount != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (s *stateObject) GetCommittedState(key common.Hash) common.Hash {
// 2) we don't have new values, and can deliver empty response back
//if _, destructed := s.db.stateObjectsDestruct[s.address]; destructed {
s.db.snapParallelLock.RLock()
if _, destructed := s.db.queryStateObjectsDestruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex?
if _, destructed := s.db.getStateObjectsDegetstruct(s.address); destructed { // fixme: use sync.Map, instead of RWMutex?
s.db.snapParallelLock.RUnlock()
return common.Hash{}
}
Expand Down
32 changes: 14 additions & 18 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ type StateDB struct {
logSize uint

// parallel EVM related
rwSet *types.RWSet
mvStates *types.MVStates
stat *types.ExeStat
rwRecordFlag bool
rwSet *types.RWSet
mvStates *types.MVStates
stat *types.ExeStat

// Preimages occurred seen by VM in the scope of block.
preimages map[common.Hash][]byte
Expand Down Expand Up @@ -686,8 +685,8 @@ func (s *StateDB) SetStorage(addr common.Address, storage map[common.Hash]common
//
// TODO(rjl493456442) this function should only be supported by 'unwritable'
// state and all mutations made should all be discarded afterwards.
if _, ok := s.queryStateObjectsDestruct(addr); !ok {
s.tagStateObjectsDestruct(addr, nil)
if _, ok := s.getStateObjectsDegetstruct(addr); !ok {
s.setStateObjectsDestruct(addr, nil)
}
object := s.GetOrNewStateObject(addr)
for k, v := range storage {
Expand Down Expand Up @@ -1025,9 +1024,9 @@ func (s *StateDB) createObject(addr common.Address) (newobj *stateObject) {
// account and storage data should be cleared as well. Note, it must
// be done here, otherwise the destruction event of "original account"
// will be lost.
_, prevdestruct := s.queryStateObjectsDestruct(prev.address)
_, prevdestruct := s.getStateObjectsDegetstruct(prev.address)
if !prevdestruct {
s.tagStateObjectsDestruct(prev.address, prev.origin)
s.setStateObjectsDestruct(prev.address, prev.origin)
}
// There may be some cached account/storage data already since IntermediateRoot
// will be called for each transaction before byzantium fork which will always
Expand Down Expand Up @@ -2373,7 +2372,6 @@ func (s *StateDB) BeforeTxTransition() {
s.rwSet = types.NewRWSet(types.StateVersion{
TxIndex: s.txIndex,
})
s.rwRecordFlag = true
}

func (s *StateDB) BeginTxStat(index int) {
Expand Down Expand Up @@ -2405,10 +2403,9 @@ func (s *StateDB) RecordRead(key types.RWKey, val interface{}) {
if s.isParallel && s.parallel.isSlotDB {
return
}
if !s.rwRecordFlag {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
return
}
// TODO: read from MVStates, record with ver
s.rwSet.RecordRead(key, types.StateVersion{
TxIndex: -1,
}, val)
Expand All @@ -2418,7 +2415,7 @@ func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) {
if s.isParallel && s.parallel.isSlotDB {
return
}
if !s.rwRecordFlag {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
return
}
s.rwSet.RecordWrite(key, val)
Expand All @@ -2430,14 +2427,13 @@ func (s *StateDB) ResetMVStates(txCount int) {
}
s.mvStates = types.NewMVStates(txCount)
s.rwSet = nil
s.rwRecordFlag = false
}

func (s *StateDB) FinaliseRWSet() error {
if s.isParallel && s.parallel.isSlotDB {
return nil
}
if !s.rwRecordFlag {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
return nil
}
if metrics.EnabledExpensive {
Expand Down Expand Up @@ -2475,11 +2471,11 @@ func (s *StateDB) FinaliseRWSet() error {
}
}

s.rwRecordFlag = false
s.rwSet.SetRWRecordDone()
return s.mvStates.FulfillRWSet(s.rwSet, s.stat)
}

func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) {
func (s *StateDB) getStateObjectsDegetstruct(addr common.Address) (*types.StateAccount, bool) {
if !(s.isParallel && s.parallel.isSlotDB) {
if acc, ok := s.stateObjectsDestructDirty[addr]; ok {
return acc, ok
Expand All @@ -2489,7 +2485,7 @@ func (s *StateDB) queryStateObjectsDestruct(addr common.Address) (*types.StateAc
return acc, ok
}

func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateAccount) {
func (s *StateDB) setStateObjectsDestruct(addr common.Address, acc *types.StateAccount) {
if !(s.isParallel && s.parallel.isSlotDB) {
s.stateObjectsDestructDirty[addr] = acc
return
Expand All @@ -2498,7 +2494,7 @@ func (s *StateDB) tagStateObjectsDestruct(addr common.Address, acc *types.StateA
return
}

func (s *StateDB) deleteStateObjectsDestruct(addr common.Address) {
func (s *StateDB) removeStateObjectsDestruct(addr common.Address) {
if !(s.isParallel && s.parallel.isSlotDB) {
delete(s.stateObjectsDestructDirty, addr)
return
Expand Down
2 changes: 1 addition & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals)

if p.bc.enableTxDAG {
// TODO(galaio): append dag into block body, TxDAGPerformance will print metrics when profile is enabled
// compare input TxDAG when it enable in consensus
dag, extraStats := statedb.ResolveTxDAG([]common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
// TODO(galaio): check TxDAG correctness?
log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag)
if metrics.EnabledExpensive {
types.EvaluateTxDAGPerformance(dag, extraStats)
Expand Down
48 changes: 34 additions & 14 deletions core/types/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type TxDAG interface {

// TxCount return tx count
TxCount() int

// SetTxDep at the last one
SetTxDep(int, TxDep) error
}

func EncodeTxDAG(dag TxDAG) ([]byte, error) {
Expand Down Expand Up @@ -99,6 +102,10 @@ func (d *EmptyTxDAG) TxCount() int {
return 0
}

func (d *EmptyTxDAG) SetTxDep(int, TxDep) error {
return nil
}

func (d *EmptyTxDAG) String() string {
return "None"
}
Expand Down Expand Up @@ -129,6 +136,18 @@ func (d *PlainTxDAG) TxCount() int {
return len(d.TxDeps)
}

func (d *PlainTxDAG) SetTxDep(i int, dep TxDep) error {
if i < 0 || i > len(d.TxDeps) {
return fmt.Errorf("SetTxDep with wrong index: %d", i)
}
if i < len(d.TxDeps) {
d.TxDeps[i] = dep
return nil
}
d.TxDeps = append(d.TxDeps, dep)
return nil
}

func NewPlainTxDAG(txLen int) *PlainTxDAG {
return &PlainTxDAG{
TxDeps: make([]TxDep, txLen),
Expand Down Expand Up @@ -285,23 +304,23 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) {
// It assumes that there is no parallel thread limit
txCount := dag.TxCount()
var (
maxGasIndex int
maxGas uint64
maxTimeIndex int
maxTime time.Duration
txTimes = make([]time.Duration, txCount)
txGases = make([]uint64, txCount)
txReads = make([]int, txCount)
noDepdencyCount int
maxGasIndex int
maxGas uint64
maxTimeIndex int
maxTime time.Duration
txTimes = make([]time.Duration, txCount)
txGases = make([]uint64, txCount)
txReads = make([]int, txCount)
noDepCnt int
)

totalTxMeter.Mark(int64(txCount))
for i, path := range paths {
if stats[i].mustSerialFlag {
if stats[i].mustSerial {
continue
}
if len(path) <= 1 {
noDepdencyCount++
noDepCnt++
totalNoDepMeter.Mark(1)
}
if len(path) <= 3 {
Expand Down Expand Up @@ -358,7 +377,7 @@ func EvaluateTxDAGPerformance(dag TxDAG, stats map[int]*ExeStat) {
sPath []int
)
for i, stat := range stats {
if stat.mustSerialFlag {
if stat.mustSerial {
continue
}
sPath = append(sPath, i)
Expand Down Expand Up @@ -399,8 +418,9 @@ type ExeStat struct {
readCount int
startTime time.Time
costTime time.Duration
// TODO: consider system tx, gas fee issues, may need to use different flag
mustSerialFlag bool

// some flags
mustSerial bool
}

func NewExeStat(txIndex int) *ExeStat {
Expand All @@ -420,7 +440,7 @@ func (s *ExeStat) Done() *ExeStat {
}

func (s *ExeStat) WithSerialFlag() *ExeStat {
s.mustSerialFlag = true
s.mustSerial = true
return s
}

Expand Down
Loading

0 comments on commit b2e4403

Please sign in to comment.