From d874f8038843ffdfadb7e3216b3ea03b4187ebc5 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Tue, 29 Oct 2024 14:38:57 +0700 Subject: [PATCH] miner/worker: speculatively apply next transaction in parallel On mining node, when executing the current transaction, we try to speculatively apply the next transaction in parallel to prefetch the storage. As a result, it can help to reduce the execution time of the next actual execution. --- cmd/ronin/main.go | 1 + cmd/utils/flags.go | 6 ++ core/types/transaction.go | 30 +++++++++ miner/miner.go | 1 + miner/ordering.go | 59 +++++++++++++++-- miner/ordering_test.go | 93 ++++++++++++++++++++++++++ miner/worker.go | 134 +++++++++++++++++++++++++++++++++++++- 7 files changed, 315 insertions(+), 9 deletions(-) diff --git a/cmd/ronin/main.go b/cmd/ronin/main.go index 5144bc14a9..dede24af65 100644 --- a/cmd/ronin/main.go +++ b/cmd/ronin/main.go @@ -142,6 +142,7 @@ var ( utils.MinerNoVerifyFlag, utils.MinerBlockProduceLeftoverFlag, utils.MinerBlockSizeReserveFlag, + utils.MinerNoSpeculation, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 1a5377eeb0..3a2cec158b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -597,6 +597,11 @@ var ( Value: ethconfig.Defaults.Miner.BlockSizeReserve, Category: flags.MinerCategory, } + MinerNoSpeculation = &cli.BoolFlag{ + Name: "miner.nospeculation", + Usage: "Disable speculatively committing next transaction", + Category: flags.MinerCategory, + } // Account settings UnlockedAccountFlag = &cli.StringFlag{ Name: "unlock", @@ -1775,6 +1780,7 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { if ctx.IsSet(LegacyMinerGasTargetFlag.Name) { log.Warn("The generic --miner.gastarget flag is deprecated and will be removed in the future!") } + cfg.NoSpeculation = ctx.Bool(MinerNoSpeculation.Name) } func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/types/transaction.go b/core/types/transaction.go index d300c4adf6..f3ac72c3db 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -631,6 +631,36 @@ func NewMessage( } } +func NewMessageWithExpiredTimeAndPayer( + from common.Address, + to *common.Address, + nonce uint64, + amount *big.Int, + gasLimit uint64, + gasPrice, gasFeeCap, gasTipCap *big.Int, + data []byte, + accessList AccessList, + isFake bool, + expiredTime uint64, + payer common.Address, +) Message { + return Message{ + from: from, + to: to, + nonce: nonce, + amount: amount, + gasLimit: gasLimit, + gasPrice: gasPrice, + gasFeeCap: gasFeeCap, + gasTipCap: gasTipCap, + data: data, + accessList: accessList, + isFake: isFake, + expiredTime: expiredTime, + payer: payer, + } +} + // AsMessage returns the transaction as a core.Message. func (tx *Transaction) AsMessage(s Signer, baseFee *big.Int) (Message, error) { msg := Message{ diff --git a/miner/miner.go b/miner/miner.go index 719859b6d0..a8e42cab9e 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -55,6 +55,7 @@ type Config struct { Noverify bool // Disable remote mining solution verification(only useful in ethash). BlockProduceLeftOver time.Duration BlockSizeReserve uint64 + NoSpeculation bool } // Miner creates blocks and searches for proof-of-work values. diff --git a/miner/ordering.go b/miner/ordering.go index 7655a73528..a387704acf 100644 --- a/miner/ordering.go +++ b/miner/ordering.go @@ -47,13 +47,7 @@ type TxByPriceAndTime []*TxWithMinerFee func (s TxByPriceAndTime) Len() int { return len(s) } func (s TxByPriceAndTime) Less(i, j int) bool { - // If the prices are equal, use the time the transaction was first seen for - // deterministic sorting - cmp := s[i].minerFee.Cmp(s[j].minerFee) - if cmp == 0 { - return s[i].tx.Time.Before(s[j].tx.Time) - } - return cmp > 0 + return cmpPriceAndTime(s[i], s[j]) } func (s TxByPriceAndTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] } @@ -69,6 +63,20 @@ func (s *TxByPriceAndTime) Pop() interface{} { return x } +// cmpPriceAndTime compares 2 transactions by their miner fee and +// time first seen in txpool. +// Returns true if `a` has higher miner fee or appears in txpool +// before `b`. +func cmpPriceAndTime(a *TxWithMinerFee, b *TxWithMinerFee) bool { + // If the prices are equal, use the time the transaction was first seen for + // deterministic sorting + cmp := a.minerFee.Cmp(b.minerFee) + if cmp == 0 { + return a.tx.Time.Before(b.tx.Time) + } + return cmp > 0 +} + // TransactionsByPriceAndNonce represents a set of transactions that can return // transactions in a profit-maximizing sorted order, while supporting removing // entire batches of transactions for non-executable accounts. @@ -142,3 +150,40 @@ func (t *TransactionsByPriceAndNonce) Size() int { func (t *TransactionsByPriceAndNonce) Clear() { t.heads = TxByPriceAndTime{} } + +// Next return the potential next committed transaction so that we can speculative +// execute that transaction. As we don't know the result of current transaction yet, +// we don't know that the next operation is shift or pop. This function returns the +// largest among right, left children and the next transaction from the account at +// the head node. +func (t *TransactionsByPriceAndNonce) Next() (*txpool.LazyTransaction, *big.Int) { + heapSize := len(t.heads) + acc := t.heads[0].from + + var candidateTx *TxWithMinerFee + if txs, ok := t.txs[acc]; ok && len(txs) > 0 { + if tx, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil { + candidateTx = tx + } + } + + if heapSize >= 2 { + // left child + if candidateTx == nil || cmpPriceAndTime(t.heads[1], candidateTx) { + candidateTx = t.heads[1] + } + + if heapSize >= 3 { + // right child + if cmpPriceAndTime(t.heads[2], candidateTx) { + candidateTx = t.heads[2] + } + } + } + + if candidateTx != nil { + return candidateTx.tx, candidateTx.minerFee + } else { + return nil, nil + } +} diff --git a/miner/ordering_test.go b/miner/ordering_test.go index 87ce644a4c..786dc97edd 100644 --- a/miner/ordering_test.go +++ b/miner/ordering_test.go @@ -169,3 +169,96 @@ func TestTransactionTimeSort(t *testing.T) { } } } + +func TestTransactionsByPriceAndNonceNext(t *testing.T) { + keys := make([]*ecdsa.PrivateKey, 4) + addresses := make([]common.Address, 4) + for i := range keys { + keys[i], _ = crypto.GenerateKey() + addresses[i] = crypto.PubkeyToAddress(keys[i].PublicKey) + } + + signer := types.NewMikoSigner(big.NewInt(2020)) + + transactions := make([]*txpool.LazyTransaction, 4) + for i := range transactions { + tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, common.Big0, 21000, big.NewInt(int64(i)), nil), signer, keys[i]) + transactions[i] = &txpool.LazyTransaction{ + Tx: tx, + Hash: tx.Hash(), + Time: tx.Time(), + GasFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(tx.GasTipCap()), + Gas: tx.Gas(), + BlobGas: tx.BlobGas(), + } + } + + // There is no next transaction + groups := make(map[common.Address][]*txpool.LazyTransaction) + groups[addresses[0]] = append(groups[addresses[0]], transactions[0]) + + txs := NewTransactionsByPriceAndNonce(signer, groups, nil) + lazyTx, _ := txs.Next() + if lazyTx != nil { + t.Fatalf("Expect no next transaction, got %v", lazyTx) + } + + // In heap, head transaction has gas price 1, child transaction has gas price 0 + groups = make(map[common.Address][]*txpool.LazyTransaction) + groups[addresses[0]] = append(groups[addresses[0]], transactions[0]) + groups[addresses[1]] = append(groups[addresses[1]], transactions[1]) + txs = NewTransactionsByPriceAndNonce(signer, groups, nil) + lazyTx, _ = txs.Next() + if lazyTx == nil || lazyTx.GasFeeCap.ToBig().Cmp(common.Big0) != 0 { + t.Fatalf("Expect to have next transaction of gas price 0, got: %v", lazyTx) + } + + // In heap, head transaction has gas price 2, children have gas price 1, 0 + // Next transaction must have gas price 1 + groups = make(map[common.Address][]*txpool.LazyTransaction) + groups[addresses[0]] = append(groups[addresses[0]], transactions[0]) + groups[addresses[1]] = append(groups[addresses[1]], transactions[1]) + groups[addresses[2]] = append(groups[addresses[2]], transactions[2]) + txs = NewTransactionsByPriceAndNonce(signer, groups, nil) + lazyTx, _ = txs.Next() + if lazyTx == nil || lazyTx.GasFeeCap.ToBig().Cmp(common.Big1) != 0 { + t.Fatalf("Expect to have next transaction of gas price 1, got: %v", lazyTx) + } + + // In heap, head transaction has gas price 3, children have gas price 2, 1 + // Next transaction must have gas price 2 + groups = make(map[common.Address][]*txpool.LazyTransaction) + groups[addresses[0]] = append(groups[addresses[0]], transactions[0]) + groups[addresses[1]] = append(groups[addresses[1]], transactions[1]) + groups[addresses[2]] = append(groups[addresses[2]], transactions[2]) + groups[addresses[3]] = append(groups[addresses[3]], transactions[3]) + txs = NewTransactionsByPriceAndNonce(signer, groups, nil) + lazyTx, _ = txs.Next() + if lazyTx == nil || lazyTx.GasFeeCap.ToBig().Cmp(common.Big2) != 0 { + t.Fatalf("Expect to have next transaction of gas price 2, got: %v", lazyTx) + } + + // The next transaction of head transaction has higher price than children in heap + nextTx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, common.Big0, 21000, big.NewInt(100), nil), signer, keys[3]) + tx := &txpool.LazyTransaction{ + Tx: nextTx, + Hash: nextTx.Hash(), + Time: nextTx.Time(), + GasFeeCap: uint256.MustFromBig(nextTx.GasFeeCap()), + GasTipCap: uint256.MustFromBig(nextTx.GasTipCap()), + Gas: nextTx.Gas(), + BlobGas: nextTx.BlobGas(), + } + + groups = make(map[common.Address][]*txpool.LazyTransaction) + groups[addresses[0]] = append(groups[addresses[0]], transactions[0]) + groups[addresses[1]] = append(groups[addresses[1]], transactions[1]) + groups[addresses[2]] = append(groups[addresses[2]], transactions[2]) + groups[addresses[3]] = append(groups[addresses[3]], transactions[3], tx) + txs = NewTransactionsByPriceAndNonce(signer, groups, nil) + lazyTx, _ = txs.Next() + if lazyTx == nil || lazyTx.GasFeeCap.ToBig().Cmp(big.NewInt(100)) != 0 { + t.Fatalf("Expect to have next transaction of gas price 100, got: %v", tx) + } +} diff --git a/miner/worker.go b/miner/worker.go index a9b829b71d..c98d96825e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -26,6 +26,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/consortium" "github.com/ethereum/go-ethereum/consensus/misc" @@ -35,8 +36,10 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" lru "github.com/hashicorp/golang-lru/v2" @@ -91,6 +94,10 @@ const ( maxBlockSize = 10 * 1024 * 1024 // 10MB ) +var ( + workerCommitTransactionsTimer = metrics.NewRegisteredTimer("worker/commit", nil) +) + // environment is the worker's current environment and holds all of the current state information. type environment struct { signer types.Signer @@ -898,6 +905,82 @@ func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } +func (w *worker) nextTransaction( + selectedTxs *TransactionsByPriceAndNonce, + notSelectedTx *txpool.LazyTransaction, + notSelectedTip *big.Int, +) *types.Transaction { + tx, txTip := selectedTxs.Next() + var nextTx *txpool.LazyTransaction + if notSelectedTx == nil { + nextTx = tx + } else if tx == nil { + nextTx = tx + } else { + if txTip.Cmp(notSelectedTip) < 0 { + nextTx = notSelectedTx + } else { + nextTx = tx + } + } + + if nextTx != nil { + return nextTx.Resolve() + } + + return nil +} + +// speculativeCommitTransaction speculatively execute the potential next transaction +// to help prefetch the storage that is loaded in the actual execution +// As the function is called in a separate goroutine, there are 2 channels to communicate +// with it +// - interrupt: The caller can close this channel to ask this function to stop +// - done: The caller can watch this channel to know if this function returns +func (w *worker) speculativeCommitTransaction( + tx *types.Transaction, + coinbase common.Address, + throwawayState *state.StateDB, + header *types.Header, + interrupt <-chan struct{}, + done chan<- struct{}, +) { + from, _ := types.Sender(w.current.signer, tx) + payer := from + if tx.Type() == types.SponsoredTxType { + payer, _ = types.Payer(w.current.signer, tx) + } + msg := types.NewMessageWithExpiredTimeAndPayer( + from, + tx.To(), + tx.Nonce(), + tx.Value(), + tx.Gas(), + tx.GasPrice(), + tx.GasFeeCap(), + tx.GasTipCap(), + tx.Data(), + tx.AccessList(), + true, // fake message to avoid nonce check + tx.ExpiredTime(), + payer, + ) + + txContext := core.NewEVMTxContext(msg) + blockContext := core.NewEVMBlockContext(header, w.chain, &coinbase) + evm := vm.NewEVM(blockContext, txContext, throwawayState, w.chainConfig, *w.chain.GetVMConfig()) + + // This goroutine watches the interrupt channel and cancel the below possibly blocking ApplyMessage. + // The evm.Cancel just informs the EVM to stop, it can return before ApplyMessage returns. + go func() { + <-interrupt + evm.Cancel() + }() + + core.ApplyMessage(evm, msg, new(core.GasPool).AddGas(math.MaxUint64)) + close(done) +} + func (w *worker) commitTransactions(plainTxs, blobTxs *TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { // Short circuit if current is nil if w.current == nil { @@ -953,6 +1036,13 @@ func (w *worker) commitTransactions(plainTxs, blobTxs *TransactionsByPriceAndNon bloomProcessor := core.NewAsyncReceiptBloomGenerator(plainTxs.Size() + blobTxs.Size()) defer bloomProcessor.Close() + var ( + copiedHeader *types.Header + speculationDone chan struct{} + ) + if !w.config.NoSpeculation && w.isRunning() { + copiedHeader = types.CopyHeader(w.current.header) + } Loop: for { if timer != nil { @@ -1001,8 +1091,10 @@ Loop: // Retrieve the next transaction and abort if all done var ( - selectedTx *txpool.LazyTransaction - selectedTxs *TransactionsByPriceAndNonce + selectedTx *txpool.LazyTransaction + selectedTxs *TransactionsByPriceAndNonce + notSelectedTx *txpool.LazyTransaction + notSelectedTip *big.Int ) plainTx, plainTip := plainTxs.Peek() blobTx, blobTip := blobTxs.Peek() @@ -1013,8 +1105,12 @@ Loop: } else { if plainTip.Cmp(blobTip) < 0 { selectedTx, selectedTxs = blobTx, blobTxs + notSelectedTx = plainTx + notSelectedTip = plainTip } else { selectedTx, selectedTxs = plainTx, plainTxs + notSelectedTx = blobTx + notSelectedTip = blobTip } } @@ -1057,6 +1153,32 @@ Loop: txs.Pop() continue } + + // Speculatively execute the next transaction + var interruptSpeculation chan struct{} + if !w.config.NoSpeculation && w.isRunning() { + interruptSpeculation = make(chan struct{}) + tx := w.nextTransaction(selectedTxs, notSelectedTx, notSelectedTip) + if tx != nil { + startSpeculation := true + if speculationDone != nil { + select { + case <-speculationDone: + default: + // Don't start new speculation when the old speculation has not finished yet + startSpeculation = false + } + } + if startSpeculation { + speculationDone = make(chan struct{}) + // FIXME: Does this cause large overhead? Copy is a quite heavy operation. + throwawayState := w.current.state.Copy() + go w.speculativeCommitTransaction(tx, coinbase, throwawayState, copiedHeader, + interruptSpeculation, speculationDone) + } + } + } + // Start executing the transaction w.current.state.SetTxContext(tx.Hash(), w.current.tcount) @@ -1093,6 +1215,12 @@ Loop: log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) txs.Shift() } + + // We are preparing to actually execute the next transaction, + // stop the speculation if it has not stopped yet. + if !w.config.NoSpeculation && w.isRunning() { + close(interruptSpeculation) + } } if !w.isRunning() && len(coalescedLogs) > 0 { @@ -1277,6 +1405,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) localBlobTxs[account] = txs } } + start := time.Now() if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { plainTxs := NewTransactionsByPriceAndNonce(w.current.signer, localPlainTxs, header.BaseFee) blobTxs := NewTransactionsByPriceAndNonce(w.current.signer, localBlobTxs, header.BaseFee) @@ -1291,6 +1420,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) return } } + workerCommitTransactionsTimer.Update(time.Since(start)) w.commit(uncles, w.fullTaskHook, true, tstart) }