Skip to content

Commit

Permalink
worker: remove the code of resubmit interval adjust
Browse files Browse the repository at this point in the history
resubmit intervalAdjust is for PoW only, to remove it to make worker simpler.

With PoW, there will be a periodic timer to check if it is the time to stop
packing transaction and start calculating the desired hash value, since other miner
could succeed in hash compute if it spends too much time packing transactions.
It will commit the current fruit to calculate root at a reasonable time.
And it will schedule a new work to get a big block if new transaction was received.

When there are too many transactions in the TxPool, the interval of the resubmit timer would be
increased and vice versa.

But it is not needed with PoS related consensus, since the block interval is determined in PoS,
and there is already a timer to stop too long packing.
  • Loading branch information
setunapo committed Nov 10, 2022
1 parent 8b7b87c commit 5eff61c
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 173 deletions.
78 changes: 0 additions & 78 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,13 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10

// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
resubmitAdjustChanSize = 10

// sealingLogAtDepth is the number of confirmations before logging successful mining.
sealingLogAtDepth = 11

// minRecommitInterval is the minimal time interval to recreate the sealing block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second

// maxRecommitInterval is the maximum time interval to recreate the sealing block with
// any newly arrived transactions.
maxRecommitInterval = 15 * time.Second

// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
// resubmitting interval.
intervalAdjustRatio = 0.1

// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0

// staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 11
)
Expand Down Expand Up @@ -177,12 +162,6 @@ type getWorkReq struct {
result chan *types.Block
}

// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
ratio float64
inc bool
}

// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
type worker struct {
Expand Down Expand Up @@ -213,7 +192,6 @@ type worker struct {
startCh chan struct{}
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
resubmitAdjustCh chan *intervalAdjust

wg sync.WaitGroup

Expand Down Expand Up @@ -279,7 +257,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
Expand Down Expand Up @@ -396,28 +373,6 @@ func (w *worker) close() {
w.wg.Wait()
}

// recalcRecommit recalculates the resubmitting interval upon feedback.
func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
var (
prevF = float64(prev.Nanoseconds())
next float64
)
if inc {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
max := float64(maxRecommitInterval.Nanoseconds())
if next > max {
next = max
}
} else {
next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
min := float64(minRecommit.Nanoseconds())
if next < min {
next = min
}
}
return time.Duration(int64(next))
}

// newWorkLoop is a standalone goroutine to submit new sealing work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
Expand Down Expand Up @@ -508,23 +463,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
w.resubmitHook(minRecommit, recommit)
}

case adjust := <-w.resubmitAdjustCh:
// Adjust resubmit interval by feedback.
if adjust.inc {
before := recommit
target := float64(recommit.Nanoseconds()) / adjust.ratio
recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
} else {
before := recommit
recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
}

if w.resubmitHook != nil {
w.resubmitHook(minRecommit, recommit)
}

case <-w.exitCh:
return
}
Expand Down Expand Up @@ -901,17 +839,6 @@ LOOP:
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
Expand Down Expand Up @@ -998,11 +925,6 @@ LOOP:
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
}

Expand Down
95 changes: 0 additions & 95 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"math/big"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -270,100 +269,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
}
}

func TestAdjustIntervalEthash(t *testing.T) {
testAdjustInterval(t, ethashChainConfig, ethash.NewFaker())
}

func TestAdjustIntervalClique(t *testing.T) {
testAdjustInterval(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}

func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()

w, _ := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
defer w.close()

w.skipSealHook = func(task *task) bool {
return true
}
w.fullTaskHook = func() {
time.Sleep(100 * time.Millisecond)
}
var (
progress = make(chan struct{}, 10)
result = make([]float64, 0, 10)
index = 0
start uint32
)
w.resubmitHook = func(minInterval time.Duration, recommitInterval time.Duration) {
// Short circuit if interval checking hasn't started.
if atomic.LoadUint32(&start) == 0 {
return
}
var wantMinInterval, wantRecommitInterval time.Duration

switch index {
case 0:
wantMinInterval, wantRecommitInterval = 3*time.Second, 3*time.Second
case 1:
origin := float64(3 * time.Second.Nanoseconds())
estimate := origin*(1-intervalAdjustRatio) + intervalAdjustRatio*(origin/0.8+intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 2:
estimate := result[index-1]
min := float64(3 * time.Second.Nanoseconds())
estimate = estimate*(1-intervalAdjustRatio) + intervalAdjustRatio*(min-intervalAdjustBias)
wantMinInterval, wantRecommitInterval = 3*time.Second, time.Duration(estimate)*time.Nanosecond
case 3:
wantMinInterval, wantRecommitInterval = time.Second, time.Second
}

// Check interval
if minInterval != wantMinInterval {
t.Errorf("resubmit min interval mismatch: have %v, want %v ", minInterval, wantMinInterval)
}
if recommitInterval != wantRecommitInterval {
t.Errorf("resubmit interval mismatch: have %v, want %v", recommitInterval, wantRecommitInterval)
}
result = append(result, float64(recommitInterval.Nanoseconds()))
index += 1
progress <- struct{}{}
}
w.start()

time.Sleep(time.Second) // Ensure two tasks have been summitted due to start opt
atomic.StoreUint32(&start, 1)

w.setRecommitInterval(3 * time.Second)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.resubmitAdjustCh <- &intervalAdjust{inc: true, ratio: 0.8}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.resubmitAdjustCh <- &intervalAdjust{inc: false}
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}

w.setRecommitInterval(500 * time.Millisecond)
select {
case <-progress:
case <-time.NewTimer(time.Second).C:
t.Error("interval reset timeout")
}
}

func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
}
Expand Down

0 comments on commit 5eff61c

Please sign in to comment.