Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: close miner on exit (instead of just stopping) #21992

Merged
merged 3 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (s *Ethereum) Stop() error {
s.bloomIndexer.Close()
close(s.closeBloomHandler)
s.txPool.Stop()
s.miner.Stop()
s.miner.Close()
s.blockchain.Stop()
s.engine.Close()
rawdb.PopUncleanShutdownMarker(s.chainDb)
Expand Down
8 changes: 7 additions & 1 deletion miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package miner
import (
"fmt"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -63,6 +64,8 @@ type Miner struct {
exitCh chan struct{}
startCh chan common.Address
stopCh chan struct{}

wg sync.WaitGroup
}

func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
Expand All @@ -75,8 +78,8 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
stopCh: make(chan struct{}),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
}
miner.wg.Add(1)
go miner.update()

return miner
}

Expand All @@ -85,6 +88,8 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
// the loop is exited. This to prevent a major security vuln where external parties can DOS you with blocks
// and halt your mining operation for as long as the DOS continues.
func (miner *Miner) update() {
defer miner.wg.Done()

events := miner.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
defer func() {
if !events.Closed() {
Expand Down Expand Up @@ -154,6 +159,7 @@ func (miner *Miner) Stop() {

func (miner *Miner) Close() {
close(miner.exitCh)
miner.wg.Wait()
}

func (miner *Miner) Mining() bool {
Expand Down
8 changes: 8 additions & 0 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ type worker struct {
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust

wg sync.WaitGroup

current *environment // An environment for current running cycle.
localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
Expand Down Expand Up @@ -225,6 +227,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
recommit = minRecommitInterval
}

worker.wg.Add(4)
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
Comment on lines +230 to 233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suggesting an approach which I think makes managing the wg a bit easier, by keeping all the code contained in one place.

Suggested change
worker.wg.Add(4)
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
worker.wg.Add(4)
go func() {
defer worker.wg.Done()
worker.mainLoop()
}()
go func() {
defer worker.wg.Done()
worker.newWorkLoop(recommit)
}()
go func() {
defer worker.wg.Done()
worker.resultLoop()
}()
go func() {
defer worker.wg.Done()
worker.taskLoop()
}()

Expand Down Expand Up @@ -323,6 +326,7 @@ func (w *worker) close() {
}
atomic.StoreInt32(&w.running, 0)
close(w.exitCh)
w.wg.Wait()
}

// recalcRecommit recalculates the resubmitting interval upon feedback.
Expand All @@ -349,6 +353,7 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t

// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
interrupt *int32
minRecommit = recommit // minimal resubmit interval specified by user.
Expand Down Expand Up @@ -446,6 +451,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {

// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
Expand Down Expand Up @@ -548,6 +554,7 @@ func (w *worker) mainLoop() {
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
defer w.wg.Done()
var (
stopCh chan struct{}
prev common.Hash
Expand Down Expand Up @@ -595,6 +602,7 @@ func (w *worker) taskLoop() {
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
defer w.wg.Done()
for {
select {
case block := <-w.resultCh:
Expand Down