Skip to content

Commit

Permalink
add parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
andyzhang2023 committed Nov 25, 2024
1 parent 65b4c13 commit c32122e
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 3 deletions.
4 changes: 4 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,10 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
return item
}

func (pool *BlobPool) AddFromAPI(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, false)[0]
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
Expand Down
11 changes: 8 additions & 3 deletions core/txpool/bundlepool/bundlepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"container/heap"
"context"
"errors"
mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"
"math/big"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
Expand Down Expand Up @@ -289,6 +290,10 @@ func (p *BundlePool) Get(hash common.Hash) *types.Transaction {
return nil
}

func (pool *BundlePool) AddFromAPI(tx *types.Transaction) error {
return nil
}

// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Expand Down
58 changes: 58 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"math"
"math/big"
"runtime"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -304,6 +305,8 @@ type LegacyPool struct {
changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

l1CostFn txpool.L1CostFunc // To apply L1 costs as rollup, optional field, may be nil.

apiTxs chan *apiTx
}

type txpoolResetRequest struct {
Expand Down Expand Up @@ -333,6 +336,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
pendingCache: newCacheForMiner(),
apiTxs: make(chan *apiTx, 128),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -410,6 +414,10 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
}
pool.wg.Add(1)
go pool.loop()

for i := 0; i < runtime.NumCPU(); i++ {
go pool.apiTxLoop()
}
return nil
}

Expand Down Expand Up @@ -1180,6 +1188,56 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, true)[0]
}

func (pool *LegacyPool) AddFromAPI(tx *types.Transaction) error {
err := make(chan error, 1)
pool.apiTxs <- &apiTx{tx, err}
return <-err
}

type apiTx struct {
tx *types.Transaction
err chan error
}

type apiTxs []*apiTx

func (atxs apiTxs) unpack() []*types.Transaction {
txs := make([]*types.Transaction, len(atxs))
for i, atx := range atxs {
txs[i] = atx.tx
}
return txs
}

func (pool *LegacyPool) apiTxLoop() {
var cache []*apiTx
var lastDone chan error
for {
if lastDone == nil && len(cache) > 0 {
// start a new handle routine to add transactions into pool
lastDone = make(chan error)
temp := apiTxs(cache)
cache = nil
go func() {
defer close(lastDone)
errs := pool.Add(temp.unpack(), false, false)
for i, err := range errs {
temp[i].err <- err
}
}()
}
select {
case <-pool.reorgShutdownCh:
return
case <-lastDone:
lastDone = nil
case apiTx := <-pool.apiTxs:
// Add the transaction to the pool
cache = append(cache, apiTx)
}
}
}

// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
//
Expand Down
2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type SubPool interface {
// to a later point to batch multiple ones together.
Add(txs []*types.Transaction, local bool, sync bool) []error

AddFromAPI(tx *types.Transaction) error

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
//
Expand Down
10 changes: 10 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,16 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction {
return nil
}

func (p *TxPool) AddFromAPI(tx *types.Transaction) error {
for _, subpool := range p.subpools {
if subpool.Filter(tx) {
//@TODO: should make it more generic
return subpool.AddFromAPI(tx)
}
}
return core.ErrTxTypeNotSupported
}

// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Expand Down

0 comments on commit c32122e

Please sign in to comment.