From d1ed977d898479281d3eb2b2adbea8dd901ae515 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 10 Nov 2022 15:05:56 +0800 Subject: [PATCH] worker: change interrupt type from *int to chan channel operation is preferred than atomic value check in golang. And it will help for the further refactor on worker. --- miner/worker.go | 63 ++++++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 0838defdb4..3e0d225f78 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -143,16 +143,15 @@ type task struct { } const ( - commitInterruptNone int32 = iota - commitInterruptNewHead - commitInterruptResubmit + commitInterruptNewHead int32 = 1 + commitInterruptResubmit int32 = 2 ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 - noempty bool - timestamp int64 + interruptCh chan int32 + noempty bool + timestamp int64 } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -377,7 +376,7 @@ func (w *worker) close() { func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 + interruptCh chan int32 minRecommit = recommit // minimal resubmit interval specified by user. timestamp int64 // timestamp for each round of sealing. ) @@ -387,13 +386,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) { <-timer.C // discard the initial tick // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) + commit := func(noempty bool, reason int32) { + if interruptCh != nil { + // each commit work will have its own interruptCh to stop work with a reason + interruptCh <- reason + close(interruptCh) } - interrupt = new(int32) + interruptCh = make(chan int32, 1) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: + case w.newWorkCh <- &newWorkReq{interruptCh: interruptCh, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -489,7 +490,7 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interruptCh, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -796,7 +797,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -822,24 +823,32 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) - interruptCh := make(chan struct{}) - defer close(interruptCh) + stopPrefetchCh := make(chan struct{}) + defer close(stopPrefetchCh) //prefetch txs from all pending txs txsPrefetch := txs.Copy() tx := txsPrefetch.Peek() txCurr := &tx - w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) + w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. + // (1) new head block event arrival, the reason is 1 + // (2) worker start or restart, the reason is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the reason is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + if interruptCh != nil { + select { + case reason, ok := <-interruptCh: + if !ok { + // should never be here, since interruptCh should not be read before + log.Warn("commit transactions stopped unknown") + } + return reason == commitInterruptNewHead + default: + } } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -1028,7 +1037,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(false) @@ -1041,13 +1050,13 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } @@ -1068,7 +1077,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interruptCh chan int32, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1094,7 +1103,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { w.commit(work, nil, false, start) } // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + w.fillTransactions(interruptCh, work) w.commit(work, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover