Skip to content

Commit

Permalink
mvstates: fix async dep gen deadlock issue & opt mining txdag generat…
Browse files Browse the repository at this point in the history
…ion; (bnb-chain#35)

* mvstates: fix async dep gen deadlock issue;
miner: support record sysytem tx rwset;

* miner: opt txdag enable checking;

---------

Co-authored-by: galaio <[email protected]>
  • Loading branch information
galaio and galaio authored Aug 15, 2024
1 parent 0854a98 commit 808d048
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2831,8 +2831,8 @@ func (bc *BlockChain) HeaderChainForceSetHead(headNumber uint64) {
bc.hc.SetHead(headNumber, nil, createDelFn(bc))
}

func (bc *BlockChain) TxDAGEnabled() bool {
return bc.enableTxDAG
func (bc *BlockChain) TxDAGEnabledWhenMine() bool {
return bc.enableTxDAG && bc.txDAGReader == nil
}

func (bc *BlockChain) TxDAGFileOpened() bool {
Expand Down
10 changes: 8 additions & 2 deletions core/types/mvstates.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
AccountSuicide
)

const (
asyncDepGenChanSize = 100
)

func AccountStateKey(account common.Address, state AccountState) RWKey {
var key RWKey
key[0] = AccountStatePrefix
Expand Down Expand Up @@ -324,7 +328,7 @@ func NewMVStates(txCount int) *MVStates {
}

func (s *MVStates) EnableAsyncDepGen() *MVStates {
s.depsGenChan = make(chan int, 100)
s.depsGenChan = make(chan int, asyncDepGenChanSize)
s.stopChan = make(chan struct{}, 1)
go s.asyncDepGenLoop()
return s
Expand Down Expand Up @@ -434,7 +438,9 @@ func (s *MVStates) Finalise(index int) error {
s.nextFinaliseIndex++
// async resolve dependency
if s.depsGenChan != nil {
s.depsGenChan <- index
go func() {
s.depsGenChan <- index
}()
}
return nil
}
Expand Down
15 changes: 14 additions & 1 deletion core/types/mvstates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/stretchr/testify/require"
)

const mockRWSetSize = 10000
const mockRWSetSize = 5000

func TestMVStates_BasicUsage(t *testing.T) {
ms := NewMVStates(0)
Expand Down Expand Up @@ -91,6 +91,19 @@ func TestMVStates_AsyncDepGen_SimpleResolveTxDAG(t *testing.T) {
t.Log(dag)
}

func TestMVStates_ResolveTxDAG_Async(t *testing.T) {
txCnt := 10000
rwSets := mockRandomRWSet(txCnt)
ms1 := NewMVStates(txCnt).EnableAsyncDepGen()
for i := 0; i < txCnt; i++ {
require.NoError(t, ms1.FulfillRWSet(rwSets[i], nil))
require.NoError(t, ms1.Finalise(i))
}
time.Sleep(100 * time.Millisecond)
_, err := ms1.ResolveTxDAG(txCnt, nil)
require.NoError(t, err)
}

func TestMVStates_ResolveTxDAG_Compare(t *testing.T) {
txCnt := 3000
rwSets := mockRandomRWSet(txCnt)
Expand Down
11 changes: 7 additions & 4 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,7 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
//append the tx DAG transaction to the block
appendTxDAG := func() {
// whether enable TxDAG
if !w.chain.TxDAGEnabled() {
if !w.chain.TxDAGEnabledWhenMine() {
return
}
// whether export to file
Expand Down Expand Up @@ -1225,9 +1225,6 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
}

start := time.Now()
if w.chain.TxDAGEnabled() {
env.state.ResetMVStates(0)
}
pending := w.eth.TxPool().Pending(true)
packFromTxpoolTimer.UpdateSince(start)
log.Debug("packFromTxpoolTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", env.header.Hash())
Expand Down Expand Up @@ -1286,13 +1283,19 @@ func (w *worker) generateWork(genParams *generateParams) *newPayloadResult {
misc.EnsureCreate2Deployer(w.chainConfig, work.header.Time, work.state)

start := time.Now()
if w.chain.TxDAGEnabledWhenMine() {
work.state.ResetMVStates(0)
}
for _, tx := range genParams.txs {
from, _ := types.Sender(work.signer, tx)
work.state.SetTxContext(tx.Hash(), work.tcount)
_, err := w.commitTransaction(work, tx)
if err != nil {
return &newPayloadResult{err: fmt.Errorf("failed to force-include tx: %s type: %d sender: %s nonce: %d, err: %w", tx.Hash(), tx.Type(), from, tx.Nonce(), err)}
}
if tx.IsSystemTx() || tx.IsDepositTx() {
work.state.RecordSystemTxRWSet(work.tcount)
}
work.tcount++
}
commitDepositTxsTimer.UpdateSince(start)
Expand Down

0 comments on commit 808d048

Please sign in to comment.