diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index b613d0acc5..34513fd6e3 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1202,6 +1202,10 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { return item } +func (pool *BlobPool) AddFromAPI(tx *types.Transaction) error { + return pool.Add([]*types.Transaction{tx}, false, false)[0] +} + // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { diff --git a/core/txpool/bundlepool/bundlepool.go b/core/txpool/bundlepool/bundlepool.go index a1b0e1a838..57f2cc4423 100644 --- a/core/txpool/bundlepool/bundlepool.go +++ b/core/txpool/bundlepool/bundlepool.go @@ -4,13 +4,14 @@ import ( "container/heap" "context" "errors" - mapset "github.com/deckarep/golang-set/v2" - "github.com/ethereum/go-ethereum/miner" - "github.com/ethereum/go-ethereum/rpc" "math/big" "sync" "time" + mapset "github.com/deckarep/golang-set/v2" + "github.com/ethereum/go-ethereum/miner" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -289,6 +290,10 @@ func (p *BundlePool) Get(hash common.Hash) *types.Transaction { return nil } +func (pool *BundlePool) AddFromAPI(tx *types.Transaction) error { + return nil +} + // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 2151a49053..52ca4d17c1 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -21,6 +21,7 @@ import ( "errors" "math" "math/big" + "runtime" "sort" "sync" "sync/atomic" @@ -317,6 +318,8 @@ type LegacyPool struct { changesSinceReorg int // A counter for how many drops we've performed in-between reorg. l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil. + + apiTxs chan *apiTx } func (pool *LegacyPool) reportMetrics(oldHead, newHead *types.Header) { @@ -381,6 +384,7 @@ func New(config Config, chain BlockChain) *LegacyPool { reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), pendingCache: newCacheForMiner(), + apiTxs: make(chan *apiTx, 128), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -462,6 +466,10 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A } pool.wg.Add(1) go pool.loop() + + for i := 0; i < runtime.NumCPU(); i++ { + go pool.apiTxLoop() + } return nil } @@ -1232,6 +1240,56 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { return pool.Add([]*types.Transaction{tx}, false, true)[0] } +func (pool *LegacyPool) AddFromAPI(tx *types.Transaction) error { + err := make(chan error, 1) + pool.apiTxs <- &apiTx{tx, err} + return <-err +} + +type apiTx struct { + tx *types.Transaction + err chan error +} + +type apiTxs []*apiTx + +func (atxs apiTxs) unpack() []*types.Transaction { + txs := make([]*types.Transaction, len(atxs)) + for i, atx := range atxs { + txs[i] = atx.tx + } + return txs +} + +func (pool *LegacyPool) apiTxLoop() { + var cache []*apiTx + var lastDone chan error + for { + if lastDone == nil && len(cache) > 0 { + // start a new handle routine to add transactions into pool + lastDone = make(chan error) + temp := apiTxs(cache) + cache = nil + go func() { + defer close(lastDone) + errs := pool.Add(temp.unpack(), false, false) + for i, err := range errs { + temp[i].err <- err + } + }() + } + select { + case <-pool.reorgShutdownCh: + return + case <-lastDone: + lastDone = nil + case apiTx := <-pool.apiTxs: + // Add the transaction to the pool + cache = append(cache, apiTx) + } + } +} + // Add enqueues a batch of transactions into the pool if they are valid. Depending // on the local flag, full pricing constraints will or will not be applied. // diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index e4f89d07f6..5160d7c733 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -128,6 +128,8 @@ type SubPool interface { // to a later point to batch multiple ones together. Add(txs []*types.Transaction, local bool, sync bool) []error + AddFromAPI(tx *types.Transaction) error + // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. // diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 522a2f74c2..50bb3e9e27 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -307,6 +307,16 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction { return nil } +func (p *TxPool) AddFromAPI(tx *types.Transaction) error { + for _, subpool := range p.subpools { + if subpool.Filter(tx) { + //@TODO: should make it more generic + return subpool.AddFromAPI(tx) + } + } + return core.ErrTxTypeNotSupported +} + // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. diff --git a/eth/api_backend.go b/eth/api_backend.go index f53561d918..d7b5af0431 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -313,7 +313,8 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) return nil } // Retain tx in local tx pool after forwarding, for local RPC usage. - if err := b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]; err != nil { + //if err := b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]; err != nil { + if err := b.eth.txPool.AddFromAPI(signedTx); err != nil { log.Warn("successfully sent tx to sequencer, but failed to persist in local tx pool", "err", err, "tx", signedTx.Hash()) } return nil