Skip to content

Commit

Permalink
perf: batch Add to txpool from api sent tx
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoieh committed Dec 9, 2024
1 parent 7948afc commit 79e9a32
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 4 deletions.
2 changes: 2 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,8 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
return item
}

func (p *BlobPool) AddSingle(tx *types.Transaction, errCh chan error) {}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
Expand Down
9 changes: 6 additions & 3 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"container/heap"
"context"
"errors"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"
"math/big"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -289,6 +290,8 @@ func (p *BundlePool) Get(hash common.Hash) *types.Transaction {
return nil
}

func (p *BundlePool) AddSingle(tx *types.Transaction, errCh chan error) {
}
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Expand Down
66 changes: 66 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ func (config *Config) sanitize() Config {
return conf
}

type AddTxReq struct {
Tx *types.Transaction
ErrCh chan error
}

// LegacyPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
Expand Down Expand Up @@ -308,6 +313,8 @@ type LegacyPool struct {
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil.

addTxCh chan AddTxReq
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -337,6 +344,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
addTxCh: make(chan AddTxReq, 20000),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -408,6 +416,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()

pool.wg.Add(1)
go pool.addTxLoop()
return nil
}

Expand Down Expand Up @@ -1182,6 +1193,10 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, true)[0]
}

func (pool *LegacyPool) AddSingle(tx *types.Transaction, errCh chan error) {
pool.addTxCh <- AddTxReq{tx, errCh}
}

// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
//
Expand Down Expand Up @@ -1417,6 +1432,57 @@ func (pool *LegacyPool) queueTxEvent(tx *types.Transaction) {
}
}

func (pool *LegacyPool) addTxLoop() {
defer pool.wg.Done()
batchTxs := make([]AddTxReq, 0, 200)
batchTxsCh := make(chan []AddTxReq, 100)
lastAddTime := time.Now()
idleTicker := time.NewTicker(50 * time.Millisecond)

for i := 0; i < 25; i++ {
go func() {
for {
select {
case txReqs := <-batchTxsCh:
batchTxs := make([]*types.Transaction, 0, 200)
for _, txReq := range txReqs {
batchTxs = append(batchTxs, txReq.Tx)
}
errs := pool.Add(batchTxs, true, false)
for i, txReq := range txReqs {
txReq.ErrCh <- errs[i]
}
case <-pool.reorgShutdownCh:
return
}
}
}()
}

for {
select {
case txReq := <-pool.addTxCh:
if len(batchTxs) == 200 {
batchTxsCh <- batchTxs
batchTxs = make([]AddTxReq, 0, 200)
lastAddTime = time.Now()
}
batchTxs = append(batchTxs, txReq)
case <-idleTicker.C:
if time.Since(lastAddTime) >= 50*time.Millisecond {
if len(batchTxs) > 0 {
batchTxsCh <- batchTxs
batchTxs = make([]AddTxReq, 0, 200)
lastAddTime = time.Now()
}
}
case <-pool.reorgShutdownCh:
idleTicker.Stop()
return
}
}
}

// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not
// call those methods directly, but request them being run using requestReset and
// requestPromoteExecutables instead.
Expand Down
1 change: 1 addition & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type SubPool interface {
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*types.Transaction, local bool, sync bool) []error
AddSingle(tx *types.Transaction, errCh chan error)

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Expand Down
8 changes: 8 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction {
return nil
}

func (p *TxPool) AddSingle(tx *types.Transaction, errCh chan error) {
for _, subpool := range p.subpools {
if subpool.Filter(tx) {
subpool.AddSingle(tx, errCh)
}
}
}

// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Expand Down
4 changes: 3 additions & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
if b.disableTxPool {
return nil
}
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
errCh := make(chan error)
b.eth.txPool.AddSingle(signedTx, errCh)
return <-errCh
}

func (b *EthAPIBackend) SendBundle(ctx context.Context, bundle *types.Bundle, originBundle *types.SendBundleArgs) error {
Expand Down

0 comments on commit 79e9a32

Please sign in to comment.