Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: some code enhancement on work.go #1174

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 0 additions & 78 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,13 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10

// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
resubmitAdjustChanSize = 10

// sealingLogAtDepth is the number of confirmations before logging successful mining.
sealingLogAtDepth = 11

// minRecommitInterval is the minimal time interval to recreate the sealing block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second

// maxRecommitInterval is the maximum time interval to recreate the sealing block with
// any newly arrived transactions.
maxRecommitInterval = 15 * time.Second

// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
// resubmitting interval.
intervalAdjustRatio = 0.1

// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0

// staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 11
)
Expand Down Expand Up @@ -177,12 +162,6 @@ type getWorkReq struct {
result chan *types.Block
}

// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
ratio float64
inc bool
}

// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
Expand Down Expand Up @@ -213,7 +192,6 @@ type worker struct {
startCh chan struct{}
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust

wg sync.WaitGroup

Expand Down Expand Up @@ -279,7 +257,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
Expand Down Expand Up @@ -396,28 +373,6 @@ func (w *worker) close() {
w.wg.Wait()
}

// recalcRecommit recalculates the resubmitting interval upon feedback.
func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
var (
prevF = float64(prev.Nanoseconds())
next float64
)
if inc {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
max := float64(maxRecommitInterval.Nanoseconds())
if next > max {
next = max
}
} else {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
min := float64(minRecommit.Nanoseconds())
if next < min {
next = min
}
}
return time.Duration(int64(next))
}

// newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
Expand Down Expand Up @@ -508,23 +463,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
w.resubmitHook(minRecommit, recommit)
}

case adjust := <-w.resubmitAdjustCh:
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
target := float64(recommit.Nanoseconds()) / adjust.ratio
recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}

if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}

case <-w.exitCh:
return
}
Expand Down Expand Up @@ -901,17 +839,6 @@ LOOP:
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
Expand Down Expand Up @@ -998,11 +925,6 @@ LOOP:
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
}

Expand Down
95 changes: 0 additions & 95 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"math/big"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
}
}

func TestAdjustIntervalEthash(t *testing.T) {
testAdjustInterval(t, ethashChainConfig, ethash.NewFaker())
}

func TestAdjustIntervalClique(t *testing.T) {
testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}

func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()

w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
defer w.close()

w.skipSealHook = func(task *task) bool {
return true
}
w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
var (
progress = make(chan struct{}, 10)
result = make([]float64, 0, 10)
index = 0
start uint32
)
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started.
if atomic.LoadUint32(&start) == 0 {
return
}
var wantMinInterval, wantRecommitInterval time.Duration

switch index {
case 0:
wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second
case 1:
origin := float64(3 * time.Second.Nanoseconds())
estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 2:
estimate := result[index-1]
min := float64(3 * time.Second.Nanoseconds())
estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 3:
wantMinInterval, wantRecommitInterval = time.Second, time.Second
}

// Check interval
if minInterval != wantMinInterval {
t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval)
}
if recommitInterval != wantRecommitInterval {
t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval)
}
result = append(result, float64(recommitInterval.Nanoseconds()))
index += 1
progress <- struct{}{}
}
w.start()

time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt
atomic.StoreUint32(&start, 1)

w.setRecommitInterval(3 * time.Second)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.resubmitAdjustCh <- &intervalAdjust{inc: false}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.setRecommitInterval(500 * time.Millisecond)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
}

func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
}
Expand Down