diff --git a/miner/worker.go b/miner/worker.go index 77ce47f73e..599032c217 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -188,8 +188,6 @@ type worker struct { // Subscriptions mux *event.TypeMux - txsCh chan core.NewTxsEvent - txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent @@ -225,7 +223,6 @@ type worker struct { // atomic status counters running int32 // The indicator whether the consensus engine is running or not. - newTxs int32 // New arrival transaction count since last sealing work submitting. // External functions isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner. @@ -253,7 +250,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus remoteUncles: make(map[common.Hash]*types.Block), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), pendingTasks: make(map[common.Hash]*task), - txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), @@ -265,8 +261,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus resubmitIntervalCh: make(chan time.Duration), recentMinedBlocks: recentMinedBlocks, } - // Subscribe NewTxsEvent for tx pool - worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -397,7 +391,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { return } timer.Reset(recommit) - atomic.StoreInt32(&w.newTxs, 0) } // clearPending cleans the stale pending tasks. clearPending := func(number uint64) { @@ -442,10 +435,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { if w.isRunning() && ((w.chainConfig.Ethash != nil) || (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period > 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period > 0)) { // Short circuit if no new transaction arrives. - if atomic.LoadInt32(&w.newTxs) == 0 { - timer.Reset(recommit) - continue - } commit(commitInterruptResubmit) } @@ -473,7 +462,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // submit it or return task according to given parameters for various proposes. func (w *worker) mainLoop() { defer w.wg.Done() - defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() defer func() { @@ -539,24 +527,9 @@ func (w *worker) mainLoop() { } } - case ev := <-w.txsCh: - if w.isRunning() { - // Special case, if the consensus engine is 0 period clique(dev mode), - // submit sealing work here since all empty submission will be rejected - // by clique. Of course the advance sealing(empty submission) is disabled. - if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) || - (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) { - w.commitWork(nil, time.Now().Unix()) - } - } - - atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) - // System stopped case <-w.exitCh: return - case <-w.txsSub.Err(): - return case <-w.chainHeadSub.Err(): return case <-w.chainSideSub.Err(): @@ -1221,6 +1194,9 @@ LOOP: } } } + // if sub's channel if full, it will block other NewTxsEvent subscribers, + // so unsubscribe ASAP and Unsubscribe() is re-enterable, safe to call several time. + sub.Unsubscribe() } // get the most profitable work bestWork := workList[0]