From 5eff61cad0a2c5694b91298d57ee065f3c8ff616 Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 10 Nov 2022 11:07:12 +0800 Subject: [PATCH 1/2] worker: remove the code of resubmit interval adjust resubmit intervalAdjust is for PoW only, to remove it to make worker simpler. With PoW, there will be a periodic timer to check if it is the time to stop packing transaction and start calculating the desired hash value, since other miner could succeed in hash compute if it spends too much time packing transactions. It will commit the current fruit to calculate root at a reasonable time. And it will schedule a new work to get a big block if new transaction was received. When there are too many transactions in the TxPool, the interval of the resubmit timer would be increased and vice versa. But it is not needed with PoS related consensus, since the block interval is determined in PoS, and there is already a timer to stop too long packing. --- miner/worker.go | 78 ------------------------------------ miner/worker_test.go | 95 -------------------------------------------- 2 files changed, 173 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index dca827f6d6..1f32e62f65 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -53,9 +53,6 @@ const ( // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 - // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. - resubmitAdjustChanSize = 10 - // sealingLogAtDepth is the number of confirmations before logging successful mining. sealingLogAtDepth = 11 @@ -63,18 +60,6 @@ const ( // any newly arrived transactions. minRecommitInterval = 1 * time.Second - // maxRecommitInterval is the maximum time interval to recreate the sealing block with - // any newly arrived transactions. - maxRecommitInterval = 15 * time.Second - - // intervalAdjustRatio is the impact a single interval adjustment has on sealing work - // resubmitting interval. - intervalAdjustRatio = 0.1 - - // intervalAdjustBias is applied during the new resubmit interval calculation in favor of - // increasing upper limit or decreasing lower limit so that the limit can be reachable. - intervalAdjustBias = 200 * 1000.0 * 1000.0 - // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 ) @@ -177,12 +162,6 @@ type getWorkReq struct { result chan *types.Block } -// intervalAdjust represents a resubmitting interval adjustment. -type intervalAdjust struct { - ratio float64 - inc bool -} - // worker is the main object which takes care of submitting new work to consensus engine // and gathering the sealing result. type worker struct { @@ -213,7 +192,6 @@ type worker struct { startCh chan struct{} exitCh chan struct{} resubmitIntervalCh chan time.Duration - resubmitAdjustCh chan *intervalAdjust wg sync.WaitGroup @@ -279,7 +257,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), - resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) @@ -396,28 +373,6 @@ func (w *worker) close() { w.wg.Wait() } -// recalcRecommit recalculates the resubmitting interval upon feedback. -func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration { - var ( - prevF = float64(prev.Nanoseconds()) - next float64 - ) - if inc { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias) - max := float64(maxRecommitInterval.Nanoseconds()) - if next > max { - next = max - } - } else { - next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias) - min := float64(minRecommit.Nanoseconds()) - if next < min { - next = min - } - } - return time.Duration(int64(next)) -} - // newWorkLoop is a standalone goroutine to submit new sealing work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() @@ -508,23 +463,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { w.resubmitHook(minRecommit, recommit) } - case adjust := <-w.resubmitAdjustCh: - // Adjust resubmit interval by feedback. - if adjust.inc { - before := recommit - target := float64(recommit.Nanoseconds()) / adjust.ratio - recommit = recalcRecommit(minRecommit, recommit, target, true) - log.Trace("Increase miner recommit interval", "from", before, "to", recommit) - } else { - before := recommit - recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false) - log.Trace("Decrease miner recommit interval", "from", before, "to", recommit) - } - - if w.resubmitHook != nil { - w.resubmitHook(minRecommit, recommit) - } - case <-w.exitCh: return } @@ -901,17 +839,6 @@ LOOP: // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - // Notify resubmit loop to increase resubmitting interval due to too frequent commits. - if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } - w.resubmitAdjustCh <- &intervalAdjust{ - ratio: ratio, - inc: true, - } - } return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done @@ -998,11 +925,6 @@ LOOP: } w.pendingLogsFeed.Send(cpy) } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } return false } diff --git a/miner/worker_test.go b/miner/worker_test.go index 2cdd4284e6..e405788c61 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -20,7 +20,6 @@ import ( "errors" "math/big" "math/rand" - "sync/atomic" "testing" "time" @@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) { } } -func TestAdjustIntervalEthash(t *testing.T) { - testAdjustInterval(t, ethashChainConfig, ethash.NewFaker()) -} - -func TestAdjustIntervalClique(t *testing.T) { - testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) -} - -func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { - defer engine.Close() - - w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) - defer w.close() - - w.skipSealHook = func(task *task) bool { - return true - } - w.fullTaskHook = func() { - time.Sleep(100 * time.Millisecond) - } - var ( - progress = make(chan struct{}, 10) - result = make([]float64, 0, 10) - index = 0 - start uint32 - ) - w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) { - // Short circuit if interval checking hasn't started. - if atomic.LoadUint32(&start) == 0 { - return - } - var wantMinInterval, wantRecommitInterval time.Duration - - switch index { - case 0: - wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second - case 1: - origin := float64(3 * time.Second.Nanoseconds()) - estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 2: - estimate := result[index-1] - min := float64(3 * time.Second.Nanoseconds()) - estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias) - wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond - case 3: - wantMinInterval, wantRecommitInterval = time.Second, time.Second - } - - // Check interval - if minInterval != wantMinInterval { - t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval) - } - if recommitInterval != wantRecommitInterval { - t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval) - } - result = append(result, float64(recommitInterval.Nanoseconds())) - index += 1 - progress <- struct{}{} - } - w.start() - - time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt - atomic.StoreUint32(&start, 1) - - w.setRecommitInterval(3 * time.Second) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } - - w.setRecommitInterval(500 * time.Millisecond) - select { - case <-progress: - case <-time.NewTimer(time.Second).C: - t.Error("interval reset timeout") - } -} - func TestGetSealingWorkEthash(t *testing.T) { testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false) } From 4728db7b292cc7bfe0264693ab447493a672b77b Mon Sep 17 00:00:00 2001 From: setunapo Date: Thu, 10 Nov 2022 15:05:56 +0800 Subject: [PATCH 2/2] worker: change interrupt type from *int to chan channel operation is preferred than atomic value check in golang. And it will help for the further refactor on worker. --- miner/worker.go | 63 ++++++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 1f32e62f65..0e0ca03ee2 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -143,16 +143,15 @@ type task struct { } const ( - commitInterruptNone int32 = iota - commitInterruptNewHead - commitInterruptResubmit + commitInterruptNewHead int32 = 1 + commitInterruptResubmit int32 = 2 ) // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 - noempty bool - timestamp int64 + interruptCh chan int32 + noempty bool + timestamp int64 } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -377,7 +376,7 @@ func (w *worker) close() { func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 + interruptCh chan int32 minRecommit = recommit // minimal resubmit interval specified by user. timestamp int64 // timestamp for each round of sealing. ) @@ -387,13 +386,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) { <-timer.C // discard the initial tick // commit aborts in-flight transaction execution with given signal and resubmits a new one. - commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) + commit := func(noempty bool, reason int32) { + if interruptCh != nil { + // each commit work will have its own interruptCh to stop work with a reason + interruptCh <- reason + close(interruptCh) } - interrupt = new(int32) + interruptCh = make(chan int32, 1) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: + case w.newWorkCh <- &newWorkReq{interruptCh: interruptCh, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -489,7 +490,7 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interruptCh, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -796,7 +797,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interruptCh chan int32) bool { gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -822,24 +823,32 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } bloomProcessors := core.NewAsyncReceiptBloomGenerator(processorCapacity) - interruptCh := make(chan struct{}) - defer close(interruptCh) + stopPrefetchCh := make(chan struct{}) + defer close(stopPrefetchCh) //prefetch txs from all pending txs txsPrefetch := txs.Copy() tx := txsPrefetch.Peek() txCurr := &tx - w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) + w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), stopPrefetchCh, txCurr) LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. - // (1) new head block event arrival, the interrupt signal is 1 - // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. + // (1) new head block event arrival, the reason is 1 + // (2) worker start or restart, the reason is 1 + // (3) worker recreate the sealing block with any newly arrived transactions, the reason is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. - if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + if interruptCh != nil { + select { + case reason, ok := <-interruptCh: + if !ok { + // should never be here, since interruptCh should not be read before + log.Warn("commit transactions stopped unknown") + } + return reason == commitInterruptNewHead + default: + } } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { @@ -1028,7 +1037,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +func (w *worker) fillTransactions(interruptCh chan int32, env *environment) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1041,13 +1050,13 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + if w.commitTransactions(env, txs, interruptCh) { return } } @@ -1068,7 +1077,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interruptCh chan int32, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1094,7 +1103,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { w.commit(work, nil, false, start) } // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) + w.fillTransactions(interruptCh, work) w.commit(work, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover