Skip to content

Commit

Permalink
core/txpool, eth, miner: retrieve plain and blob txs separately (#29026)
Browse files Browse the repository at this point in the history
* core/txpool, eth, miner: retrieve plain and blob txs separately

* core/txpool: fix typo, no farming

* miner: farm all the typos

Co-authored-by: Martin HS <[email protected]>

---------

Co-authored-by: Martin HS <[email protected]>
  • Loading branch information
karalabe and holiman authored Feb 20, 2024
1 parent ac0ff04 commit f4852b8
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 50 deletions.
19 changes: 12 additions & 7 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,12 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyPlainTxs {
return nil
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
// The latter will be pretty much moot, but we've kept it to have symmetric
Expand All @@ -1466,20 +1471,20 @@ func (p *BlobPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *u
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
for _, tx := range txs {
// If transaction filtering was requested, discard badly priced ones
if minTip != nil && baseFee != nil {
if tx.execFeeCap.Lt(baseFee) {
if filter.MinTip != nil && filter.BaseFee != nil {
if tx.execFeeCap.Lt(filter.BaseFee) {
break // basefee too low, cannot be included, discard rest of txs from the account
}
tip := new(uint256.Int).Sub(tx.execFeeCap, baseFee)
tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee)
if tip.Gt(tx.execTipCap) {
tip = tx.execTipCap
}
if tip.Lt(minTip) {
if tip.Lt(filter.MinTip) {
break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account
}
}
if blobFee != nil {
if tx.blobFeeCap.Lt(blobFee) {
if filter.BlobFee != nil {
if tx.blobFeeCap.Lt(filter.BlobFee) {
break // blobfee too low, cannot be included, discard rest of txs from the account
}
}
Expand Down
6 changes: 5 additions & 1 deletion core/txpool/blobpool/blobpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,11 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
p := pool.Pending(uint256.NewInt(1), chain.basefee, chain.blobfee)
p := pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,
})
if len(p) != int(capacity) {
b.Fatalf("have %d want %d", len(p), capacity)
}
Expand Down
16 changes: 10 additions & 6 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,12 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.OnlyBlobTxs {
return nil
}
pool.mu.Lock()
defer pool.mu.Unlock()

Expand All @@ -531,13 +536,12 @@ func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobF
minTipBig *big.Int
baseFeeBig *big.Int
)
if minTip != nil {
minTipBig = minTip.ToBig()
if filter.MinTip != nil {
minTipBig = filter.MinTip.ToBig()
}
if baseFee != nil {
baseFeeBig = baseFee.ToBig()
if filter.BaseFee != nil {
baseFeeBig = filter.BaseFee.ToBig()
}

pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
Expand Down
17 changes: 16 additions & 1 deletion core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ type LazyResolver interface {
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error

// PendingFilter is a collection of filter rules to allow retrieving a subset
// of transactions for announcement or mining.
//
// Note, the entries here are not arbitrary useful filters, rather each one has
// a very specific call site in mind and each one can be evaluated very cheaply
// by the pool implementations. Only add new ones that satisfy those constraints.
type PendingFilter struct {
MinTip *uint256.Int // Minimum miner tip required to include a transaction
BaseFee *uint256.Int // Minimum 1559 basefee needed to include a transaction
BlobFee *uint256.Int // Minimum 4844 blobfee needed to include a blob transaction

OnlyPlainTxs bool // Return only plain EVM transactions (peer-join announces, block space filling)
OnlyBlobTxs bool // Return only blob transactions (block blob-space filling)
}

// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
Expand Down Expand Up @@ -118,7 +133,7 @@ type SubPool interface {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
5 changes: 2 additions & 3 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)

// TxStatus is the current status of a transaction as seen by the pool.
Expand Down Expand Up @@ -357,10 +356,10 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*LazyTransaction {
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(minTip, baseFee, blobFee) {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
}
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}

func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(nil, nil, nil)
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {
Expand Down
5 changes: 3 additions & 2 deletions eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -263,7 +264,7 @@ func (c *SimulatedBeacon) Rollback() {

// Fork sets the head to the provided hash.
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
return errors.New("pending block dirty")
}
parent := c.eth.BlockChain().GetBlockByHash(parentHash)
Expand All @@ -275,7 +276,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {

// AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(nil, nil, nil)) != 0 {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
return errors.New("could not adjust time on non-empty block")
}
parent := c.eth.BlockChain().CurrentBlock()
Expand Down
3 changes: 1 addition & 2 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/triedb/pathdb"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -74,7 +73,7 @@ type txPool interface {

// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction
Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction

// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int, blobFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock()
defer p.lock.RUnlock()

Expand Down
3 changes: 2 additions & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -36,7 +37,7 @@ const (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash
for _, batch := range h.txpool.Pending(nil, nil, nil) {
for _, batch := range h.txpool.Pending(txpool.PendingFilter{OnlyPlainTxs: true}) {
for _, tx := range batch {
hashes = append(hashes, tx.Hash)
}
Expand Down
11 changes: 11 additions & 0 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,14 @@ func (t *transactionsByPriceAndNonce) Shift() {
func (t *transactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

// Empty returns if the price heap is empty. It can be used to check it simpler
// than calling peek and checking for nil return.
func (t *transactionsByPriceAndNonce) Empty() bool {
return len(t.heads) == 0
}

// Clear removes the entire content of the heap.
func (t *transactionsByPriceAndNonce) Clear() {
t.heads, t.txs = nil, nil
}
86 changes: 61 additions & 25 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ func (w *worker) mainLoop() {
BlobGas: tx.BlobGas(),
})
}
txset := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
plainTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) // Mixed bag of everrything, yolo
blobTxs := newTransactionsByPriceAndNonce(w.current.signer, nil, w.current.header.BaseFee) // Empty bag, don't bother optimising

tcount := w.current.tcount
w.commitTransactions(w.current, txset, nil, new(uint256.Int))
w.commitTransactions(w.current, plainTxs, blobTxs, nil)

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -802,7 +804,7 @@ func (w *worker) applyTransaction(env *environment, tx *types.Transaction) (*typ
return receipt, err
}

func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAndNonce, interrupt *atomic.Int32, minTip *uint256.Int) error {
func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -821,8 +823,33 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// If we don't have enough blob space for any further blob transactions,
// skip that list altogether
if !blobTxs.Empty() && env.blobs*params.BlobTxBlobGasPerBlob >= params.MaxBlobGasPerBlock {
log.Trace("Not enough blob space for further blob transactions")
blobTxs.Clear()
// Fall though to pick up any plain txs
}
// Retrieve the next transaction and abort if all done.
ltx, tip := txs.Peek()
var (
ltx *txpool.LazyTransaction
txs *transactionsByPriceAndNonce
)
pltx, ptip := plainTxs.Peek()
bltx, btip := blobTxs.Peek()

switch {
case pltx == nil:
txs, ltx = blobTxs, bltx
case bltx == nil:
txs, ltx = plainTxs, pltx
default:
if ptip.Lt(btip) {
txs, ltx = blobTxs, bltx
} else {
txs, ltx = plainTxs, pltx
}
}
if ltx == nil {
break
}
Expand All @@ -837,11 +864,6 @@ func (w *worker) commitTransactions(env *environment, txs *transactionsByPriceAn
txs.Pop()
continue
}
// If we don't receive enough tip for the next transaction, skip the account
if tip.Cmp(minTip) < 0 {
log.Trace("Not enough tip for transaction", "hash", ltx.Hash, "tip", tip, "needed", minTip)
break // If the next-best is too low, surely no better will be available
}
// Transaction seems to fit, pull it up from the pool
tx := ltx.Resolve()
if tx == nil {
Expand Down Expand Up @@ -1005,35 +1027,49 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err
w.mu.RUnlock()

// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
var baseFee *uint256.Int
filter := txpool.PendingFilter{
MinTip: tip,
}
if env.header.BaseFee != nil {
baseFee = uint256.MustFromBig(env.header.BaseFee)
filter.BaseFee = uint256.MustFromBig(env.header.BaseFee)
}
var blobFee *uint256.Int
if env.header.ExcessBlobGas != nil {
blobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(*env.header.ExcessBlobGas))
}
pending := w.eth.TxPool().Pending(tip, baseFee, blobFee)
filter.OnlyPlainTxs, filter.OnlyBlobTxs = true, false
pendingPlainTxs := w.eth.TxPool().Pending(filter)

filter.OnlyPlainTxs, filter.OnlyBlobTxs = false, true
pendingBlobTxs := w.eth.TxPool().Pending(filter)

// Split the pending transactions into locals and remotes.
localTxs, remoteTxs := make(map[common.Address][]*txpool.LazyTransaction), pending
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs

for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
delete(remoteTxs, account)
localTxs[account] = txs
if txs := remotePlainTxs[account]; len(txs) > 0 {
delete(remotePlainTxs, account)
localPlainTxs[account] = txs
}
if txs := remoteBlobTxs[account]; len(txs) > 0 {
delete(remoteBlobTxs, account)
localBlobTxs[account] = txs
}
}

// Fill the block with all available pending transactions.
if len(localTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, new(uint256.Int)); err != nil {
if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee)

if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
if len(remoteTxs) > 0 {
txs := newTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if err := w.commitTransactions(env, txs, interrupt, tip); err != nil {
if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee)

if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
Expand Down

0 comments on commit f4852b8

Please sign in to comment.