Skip to content

Commit

Permalink
mvstates: fix oom issue when mining is enabled;
Browse files Browse the repository at this point in the history
mvstates: opt async dep generation;
mvstates: opt resolve dep logic;
mvstates: fix async dep gen deadlock issue;
miner: support record sysytem tx rwset;
miner: opt txdag enable checking;
txdag: fix system tx finalise issue;
mvstate: using pending writes to accelerate txdag generation;
txdag: test snappy compress ratio;
txdag: add more bench tests;
txdag: reduce mem alloc and async resolve tx dependency;
txdag: add excluded flag;
mvstates: generate txdag with excluded flag;
txdag: support multi flags, and supported in pevm;
txdag: opt TxDAG rwset collecting & generating;
txdag: opt txdag encoding, reduce rlp size
  • Loading branch information
galaio committed Sep 23, 2024
1 parent b5514e7 commit ec9ec73
Show file tree
Hide file tree
Showing 10 changed files with 887 additions and 300 deletions.
17 changes: 16 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
vtime := time.Since(vstart)
proctime := time.Since(start) // processing + validation

if bc.enableTxDAG {
// compare input TxDAG when it enable in consensus
dag, err := statedb.ResolveTxDAG(len(block.Transactions()), []common.Address{block.Coinbase(), params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if err == nil {
// TODO(galaio): check TxDAG correctness?
log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag)
if metrics.EnabledExpensive {
go types.EvaluateTxDAGPerformance(dag, statedb.ResolveStats())
}
} else {
log.Error("ResolveTxDAG err", "block", block.NumberU64(), "tx", len(block.Transactions()), "err", err)
}
}

// Update the metrics touched during block processing and validation
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
Expand Down Expand Up @@ -2635,10 +2649,11 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}

func (bc *BlockChain) TxDAGEnabled() bool {
func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG
}

func (bc *BlockChain) SetupTxDAGGeneration() {
log.Info("node enable TxDAG feature")
bc.enableTxDAG = true
}
33 changes: 24 additions & 9 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,12 +1714,16 @@ func (s *StateDB) StopTxStat(usedGas uint64) {
}
// record stat first
if metrics.EnabledExpensive && s.stat != nil {
s.stat.Done().WithGas(usedGas).WithRead(len(s.rwSet.ReadSet()))
s.stat.Done().WithGas(usedGas)
rwSet := s.mvStates.RWSet(s.txIndex)
if rwSet != nil {
s.stat.WithRead(len(rwSet.ReadSet()))
}
}
}

func (s *StateDB) RecordRead(key types.RWKey, val interface{}) {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return
}
s.rwSet.RecordRead(key, types.StateVersion{
Expand All @@ -1728,21 +1732,26 @@ func (s *StateDB) RecordRead(key types.RWKey, val interface{}) {
}

func (s *StateDB) RecordWrite(key types.RWKey, val interface{}) {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return
}
s.rwSet.RecordWrite(key, val)
}

func (s *StateDB) ResetMVStates(txCount int) {
s.mvStates = types.NewMVStates(txCount)
if s.mvStates != nil {
s.mvStates.Stop()
}
s.mvStates = types.NewMVStates(txCount).EnableAsyncDepGen()
s.rwSet = nil
}

func (s *StateDB) FinaliseRWSet() error {
if s.rwSet == nil || s.rwSet.RWRecordDone() {
if s.rwSet == nil {
return nil
}
rwSet := s.rwSet
stat := s.stat
if metrics.EnabledExpensive {
defer func(start time.Time) {
s.TxDAGGenerate += time.Since(start)
Expand All @@ -1751,7 +1760,7 @@ func (s *StateDB) FinaliseRWSet() error {
ver := types.StateVersion{
TxIndex: s.txIndex,
}
if ver != s.rwSet.Version() {
if ver != rwSet.Version() {
return errors.New("you finalize a wrong ver of RWSet")
}

Expand All @@ -1778,8 +1787,13 @@ func (s *StateDB) FinaliseRWSet() error {
}
}

s.rwSet.SetRWRecordDone()
return s.mvStates.FulfillRWSet(s.rwSet, s.stat)
// reset stateDB
s.rwSet = nil
if err := s.mvStates.FulfillRWSet(rwSet, stat); err != nil {
return err
}
// just Finalise rwSet in serial execution
return s.mvStates.Finalise(s.txIndex)
}

func (s *StateDB) getStateObjectsDestruct(addr common.Address) (*types.StateAccount, bool) {
Expand Down Expand Up @@ -1829,7 +1843,8 @@ func (s *StateDB) RecordSystemTxRWSet(index int) {
}
s.mvStates.FulfillRWSet(types.NewRWSet(types.StateVersion{
TxIndex: index,
}).WithSerialFlag(), types.NewExeStat(index).WithSerialFlag())
}).WithExcludedTxFlag(), types.NewExeStat(index).WithExcludedTxFlag())
s.mvStates.Finalise(index)
}

// copySet returns a deep-copied set.
Expand Down
20 changes: 4 additions & 16 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/log"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand Down Expand Up @@ -109,6 +107,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// if systemTx or depositTx, tag it
if tx.IsSystemTx() || tx.IsDepositTx() {
statedb.RecordSystemTxRWSet(i)
}
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if metrics.EnabledExpensive {
Expand All @@ -123,20 +125,6 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), withdrawals)

if p.bc.enableTxDAG {
// compare input TxDAG when it enable in consensus
dag, err := statedb.ResolveTxDAG(len(block.Transactions()), []common.Address{context.Coinbase, params.OptimismBaseFeeRecipient, params.OptimismL1FeeRecipient})
if err == nil {
// TODO(galaio): check TxDAG correctness?
log.Debug("Process TxDAG result", "block", block.NumberU64(), "txDAG", dag)
if metrics.EnabledExpensive {
types.EvaluateTxDAGPerformance(dag, statedb.ResolveStats())
}
} else {
log.Error("ResolveTxDAG err", "block", block.NumberU64(), "tx", len(block.Transactions()), "err", err)
}
}
return receipts, allLogs, *usedGas, nil
}

Expand Down
12 changes: 7 additions & 5 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func (st *StateTransition) preCheck() error {
// However if any consensus issue encountered, return the error directly with
// nil evm execution result.
func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
// start record rw set in here
if !st.msg.IsSystemTx && !st.msg.IsDepositTx {
st.state.BeforeTxTransition()
}
if mint := st.msg.Mint; mint != nil {
mintU256, overflow := uint256.FromBig(mint)
if overflow {
Expand All @@ -435,7 +439,7 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
}
// just record error tx here
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise error deposit tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
log.Error("finalise error deposit tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex(), "err", ferr)
}
result = &ExecutionResult{
UsedGas: gasUsed,
Expand All @@ -447,15 +451,13 @@ func (st *StateTransition) TransitionDb() (*ExecutionResult, error) {
if err != nil {
// just record error tx here
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise error tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
log.Error("finalise error tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex(), "err", ferr)
}
}
return result, err
}

func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {
// start record rw set in here
st.state.BeforeTxTransition()
// First check this message satisfies all consensus rules before
// applying the message. The rules include these clauses
//
Expand Down Expand Up @@ -534,7 +536,7 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {

// stop record rw set in here, skip gas fee distribution
if ferr := st.state.FinaliseRWSet(); ferr != nil {
log.Error("finalise tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex())
log.Error("finalise tx rwSet fail", "block", st.evm.Context.BlockNumber, "tx", st.evm.StateDB.TxIndex(), "err", ferr)
}

// if deposit: skip refunds, skip tipping coinbase
Expand Down
Loading

0 comments on commit ec9ec73

Please sign in to comment.