Skip to content

Commit

Permalink
worker: some Subscribe NewTxsEvent changes of miner worker
Browse files Browse the repository at this point in the history
1.remove the unnecessary NewTxsEvent subscriber, which was used for PoW resubmit check.
2.unsubscribe ASAP before another fillTransactions, to avoid block others.
  • Loading branch information
setunapo committed Nov 26, 2022
1 parent f35abea commit e1d3d5e
Showing 1 changed file with 3 additions and 27 deletions.
30 changes: 3 additions & 27 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ type worker struct {

// Subscriptions
mux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
Expand Down Expand Up @@ -225,7 +223,6 @@ type worker struct {

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.

// External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.
Expand Down Expand Up @@ -253,7 +250,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
remoteUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
Expand All @@ -265,8 +261,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
recentMinedBlocks: recentMinedBlocks,
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
Expand Down Expand Up @@ -397,7 +391,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
return
}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
Expand Down Expand Up @@ -442,10 +435,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
if w.isRunning() && ((w.chainConfig.Ethash != nil) || (w.chainConfig.Clique != nil &&
w.chainConfig.Clique.Period > 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period > 0)) {
// Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 {
timer.Reset(recommit)
continue
}
commit(commitInterruptResubmit)
}

Expand Down Expand Up @@ -473,7 +462,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// submit it or return task according to given parameters for various proposes.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
defer func() {
Expand Down Expand Up @@ -539,24 +527,9 @@ func (w *worker) mainLoop() {
}
}

case ev := <-w.txsCh:
if w.isRunning() {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
// by clique. Of course the advance sealing(empty submission) is disabled.
if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) ||
(w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) {
w.commitWork(nil, time.Now().Unix())
}
}

atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))

// System stopped
case <-w.exitCh:
return
case <-w.txsSub.Err():
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
Expand Down Expand Up @@ -1221,6 +1194,9 @@ LOOP:
}
}
}
// if sub's channel if full, it will block other NewTxsEvent subscribers,
// so unsubscribe ASAP and Unsubscribe() is re-enterable, safe to call several time.
sub.Unsubscribe()
}
// get the most profitable work
bestWork := workList[0]
Expand Down

0 comments on commit e1d3d5e

Please sign in to comment.