Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interrupting commit experiment #556

Merged
merged 17 commits into from
Mar 20, 2023
66 changes: 59 additions & 7 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ func (w *worker) mainLoop(ctx context.Context) {
}
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil)

interruptCh, _ := getInterruptCh(w.current.header.Number.Uint64())
w.commitTransactions(w.current, txset, nil, interruptCh)

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -911,7 +913,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
//nolint:gocognit
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -937,6 +940,13 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
inc: true,
}
}

select {
case <-interruptCh:
break
default:
}

return atomic.LoadInt32(interrupt) == commitInterruptNewHead
}
// If we don't have enough gas for any further transactions then we're done
Expand Down Expand Up @@ -1120,7 +1130,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment) {
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -1174,7 +1184,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt)
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
})

if committed {
Expand All @@ -1197,7 +1207,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt)
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
})

if committed {
Expand All @@ -1222,14 +1232,50 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ
}
defer work.discard()

w.fillTransactions(ctx, nil, work)
interruptCh, _ := getInterruptCh(work.header.Number.Uint64())

w.fillTransactions(ctx, nil, work, interruptCh)

return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

var interruptMap = make(map[uint64]chan struct{})
var interruptMapMu sync.Mutex

func getInterruptCh(blockNumber uint64) (chan struct{}, bool) {
interruptMapMu.Lock()

interruptMapCh, ok := interruptMap[blockNumber]
if !ok {
interruptMapCh = make(chan struct{})
interruptMap[blockNumber] = interruptMapCh
}

interruptMapMu.Unlock()

fmt.Println("====", blockNumber, !ok)

return interruptMapCh, !ok
}

// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
interruptCh, isNew := getInterruptCh(w.chain.CurrentBlock().NumberU64() + 1)
if !isNew {
select {
case <-interruptCh:
// nothing to do
default:
fmt.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
close(interruptCh)
}

//return
}
}

start := time.Now()

var (
Expand Down Expand Up @@ -1277,7 +1323,9 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
}

// Fill pending transactions from the txpool
w.fillTransactions(ctx, interrupt, work)
interruptCh, _ := getInterruptCh(work.header.Number.Uint64())

w.fillTransactions(ctx, interrupt, work, interruptCh)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand Down Expand Up @@ -1325,6 +1373,10 @@ func (w *worker) commit(ctx context.Context, env *environment, interval func(),
return err
}

interruptMapMu.Lock()
delete(interruptMap, block.Number().Uint64())
interruptMapMu.Unlock()

// If we're post merge, just ignore
if !w.isTTDReached(block.Header()) {
select {
Expand Down