From f35abea1e54edee083042769a17ca3ae8c459a1f Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 25 Nov 2022 14:05:19 +0800 Subject: [PATCH 1/2] worker: reused triePrefetch when multi-fillTransactions to mine a block. avoid too much trie prefetch routines when several fillTransactions are called. --- core/state/statedb.go | 19 +++++++++++++++++++ miner/worker.go | 32 ++++++++++++++++++++------------ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/core/state/statedb.go b/core/state/statedb.go index e7bf3c9491..f02908e881 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -200,6 +200,25 @@ func (s *StateDB) EnableWriteOnSharedStorage() { s.writeOnSharedStorage = true } +// In mining mode, we will try multi-fillTransactions to get the most profitable one. +// StateDB will be created for each fillTransactions with same block height. +// Share a single triePrefetcher to avoid too much prefetch routines. +func (s *StateDB) TransferPrefetcher(prev *StateDB) { + if prev == nil { + return + } + var fetcher *triePrefetcher + + prev.prefetcherLock.Lock() + fetcher = prev.prefetcher + prev.prefetcher = nil + prev.prefetcherLock.Unlock() + + s.prefetcherLock.Lock() + s.prefetcher = fetcher + s.prefetcherLock.Unlock() +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. diff --git a/miner/worker.go b/miner/worker.go index 200ffb88b2..77ce47f73e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -718,14 +718,19 @@ func (w *worker) resultLoop() { } // makeEnv creates a new environment for the sealing block. -func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address) (*environment, error) { +func (w *worker) makeEnv(parent *types.Block, header *types.Header, coinbase common.Address, + prevEnv *environment) (*environment, error) { // Retrieve the parent state to execute on top and start a prefetcher for // the miner to speed block sealing up a bit state, err := w.chain.StateAtWithSharedPool(parent.Root()) if err != nil { return nil, err } - state.StartPrefetcher("miner") + if prevEnv == nil { + state.StartPrefetcher("miner") + } else { + state.TransferPrefetcher(prevEnv.state) + } // Note the passed coinbase may be different with header.Coinbase. env := &environment{ @@ -941,6 +946,7 @@ type generateParams struct { random common.Hash // The randomness generated by beacon chain, empty before the merge noUncle bool // Flag whether the uncle block inclusion is allowed noExtra bool // Flag whether the extra field assignment is allowed + prevWork *environment } // prepareWork constructs the sealing task according to the given parameters, @@ -999,7 +1005,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // Could potentially happen if starting to mine in an odd state. // Note genParams.coinbase can be different with header.Coinbase // since clique algorithm can modify the coinbase field in header. - env, err := w.makeEnv(parent, header, genParams.coinbase) + env, err := w.makeEnv(parent, header, genParams.coinbase, genParams.prevWork) if err != nil { log.Error("Failed to create sealing context", "err", err) return nil, err @@ -1105,27 +1111,29 @@ func (w *worker) commitWork(interruptCh chan int32, timestamp int64) { // validator can try several times to get the most profitable block, // as long as the timestamp is not reached. workList := make([]*environment, 0, 10) - var bestWork *environment + var prevWork *environment // workList clean up defer func() { - for _, w := range workList { + for _, wk := range workList { // only keep the best work, discard others. - if w == bestWork { + if wk == w.current { continue } - w.discard() + wk.discard() } }() + LOOP: for { work, err := w.prepareWork(&generateParams{ timestamp: uint64(timestamp), coinbase: coinbase, + prevWork: prevWork, }) if err != nil { return } - + prevWork = work workList = append(workList, work) delay := w.engine.Delay(w.chain, work.header, &w.config.DelayLeftOver) @@ -1215,13 +1223,13 @@ LOOP: } } // get the most profitable work - bestWork = workList[0] + bestWork := workList[0] bestReward := new(big.Int) - for i, w := range workList { - balance := w.state.GetBalance(consensus.SystemAddress) + for i, wk := range workList { + balance := wk.state.GetBalance(consensus.SystemAddress) log.Debug("Get the most profitable work", "index", i, "balance", balance, "bestReward", bestReward) if balance.Cmp(bestReward) > 0 { - bestWork = w + bestWork = wk bestReward = balance } } From e1d3d5edcb6d1744ecd61d3f37a5594c3e927a01 Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 25 Nov 2022 17:46:22 +0800 Subject: [PATCH 2/2] worker: some Subscribe NewTxsEvent changes of miner worker 1.remove the unnecessary NewTxsEvent subscriber, which was used for PoW resubmit check. 2.unsubscribe ASAP before another fillTransactions, to avoid block others. --- miner/worker.go | 30 +++--------------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 77ce47f73e..599032c217 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -188,8 +188,6 @@ type worker struct { // Subscriptions mux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent @@ -225,7 +223,6 @@ type worker struct { // atomic status counters running int32 // The indicator whether the consensus engine is running or not. - newTxs int32 // New arrival transaction count since last sealing work submitting. // External functions isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. @@ -253,7 +250,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus remoteUncles: make(map[common.Hash]*types.Block), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), @@ -265,8 +261,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), recentMinedBlocks: recentMinedBlocks, } - // Subscribe NewTxsEvent for tx pool - worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -397,7 +391,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { return } timer.Reset(recommit) - atomic.StoreInt32(&w.newTxs, 0) } // clearPending cleans the stale pending tasks. clearPending := func(number uint64) { @@ -442,10 +435,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { if w.isRunning() && ((w.chainConfig.Ethash != nil) || (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period > 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period > 0)) { // Short circuit if no new transaction arrives. - if atomic.LoadInt32(&w.newTxs) == 0 { - timer.Reset(recommit) - continue - } commit(commitInterruptResubmit) } @@ -473,7 +462,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // submit it or return task according to given parameters for various proposes. func (w *worker) mainLoop() { defer w.wg.Done() - defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() defer func() { @@ -539,24 +527,9 @@ func (w *worker) mainLoop() { } } - case ev := <-w.txsCh: - if w.isRunning() { - // Special case, if the consensus engine is 0 period clique(dev mode), - // submit sealing work here since all empty submission will be rejected - // by clique. Of course the advance sealing(empty submission) is disabled. - if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) || - (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) { - w.commitWork(nil, time.Now().Unix()) - } - } - - atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) - // System stopped case <-w.exitCh: return - case <-w.txsSub.Err(): - return case <-w.chainHeadSub.Err(): return case <-w.chainSideSub.Err(): @@ -1221,6 +1194,9 @@ LOOP: } } } + // if sub's channel if full, it will block other NewTxsEvent subscribers, + // so unsubscribe ASAP and Unsubscribe() is re-enterable, safe to call several time. + sub.Unsubscribe() } // get the most profitable work bestWork := workList[0]