Skip to content

Commit

Permalink
feat: reannounce local pending transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
j75689 committed Jun 13, 2023
1 parent 3b49b69 commit f257d62
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 16 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.SyncModeFlag,
utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ var (
Value: ethconfig.Defaults.TxPool.Lifetime,
Category: flags.TxPoolCategory,
}
TxPoolReannounceTimeFlag = &cli.DurationFlag{
Name: "txpool.reannouncetime",
Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)",
Value: ethconfig.Defaults.TxPool.ReannounceTime,
Category: flags.TxPoolCategory,
}

// Performance tuning settings
CacheFlag = &cli.IntFlag{
Expand Down Expand Up @@ -1654,6 +1660,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}
if ctx.IsSet(TxPoolReannounceTimeFlag.Name) {
cfg.ReannounceTime = ctx.Duration(TxPoolReannounceTimeFlag.Name)
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
74 changes: 60 additions & 14 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024
)

var (
Expand Down Expand Up @@ -103,7 +106,8 @@ var (
// L1 Info Gas Overhead is the amount of gas the the L1 info deposit consumes.
// It is removed from the tx pool max gas to better indicate that L2 transactions
// are not able to consume all of the gas in a L2 block as the L1 info deposit is always present.
l1InfoGasOverhead = uint64(70_000)
l1InfoGasOverhead = uint64(70_000)
reannounceInterval = time.Minute // Time interval to check for reannounce transactions
)

var (
Expand Down Expand Up @@ -179,7 +183,8 @@ type Config struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
}

// DefaultConfig contains the default configurations for the transaction
Expand All @@ -196,7 +201,8 @@ var DefaultConfig = Config{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -235,6 +241,10 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime)
conf.Lifetime = DefaultConfig.Lifetime
}
if conf.ReannounceTime < time.Minute {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
return conf
}

Expand All @@ -246,14 +256,15 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul bool // Fork indicator whether we are in the istanbul stage.
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
Expand Down Expand Up @@ -358,14 +369,16 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
reannounce = time.NewTicker(reannounceInterval)
journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop()
defer evict.Stop()
defer reannounce.Stop()
defer journal.Stop()

// Notify tests that the init phase is done
Expand Down Expand Up @@ -415,6 +428,33 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()

case <-reannounce.C:
pool.mu.RLock()
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}

for _, tx := range list.Flatten() {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
if len(txs) >= txReannoMaxNum {
return txs
}
}
}
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
pool.reannoTxFeed.Send(core.ReannoTxsEvent{reannoTxs})
}

// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
Expand Down Expand Up @@ -449,6 +489,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subsc
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down
5 changes: 5 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ type TxData interface {
effectiveGasPrice(dst *big.Int, baseFee *big.Int) *big.Int
}

// Time returns transaction's time
func (tx *Transaction) Time() time.Time {
return tx.time
}

// EncodeRLP implements rlp.Encoder
func (tx *Transaction) EncodeRLP(w io.Writer) error {
if tx.Type() == LegacyTxType {
Expand Down
44 changes: 44 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// SubscribeReannoTxsEvent should return an event subscription of
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -116,6 +120,8 @@ type handler struct {
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription

requiredBlocks map[uint64]common.Hash
Expand Down Expand Up @@ -552,6 +558,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// announce local pending transactions again
h.wg.Add(1)
h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh)
go h.txReannounceLoop()

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
Expand All @@ -564,6 +576,7 @@ func (h *handler) Start(maxPeers int) {

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit chainSync and txsync64.
Expand Down Expand Up @@ -668,6 +681,24 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
"tx packs", directPeers, "broadcast txs", directCount)
}

// ReannounceTransactions will announce a batch of local pending transactions
// to a square root of all peers.
func (h *handler) ReannounceTransactions(txs types.Transactions) {
hashes := make([]common.Hash, 0, txs.Len())
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}

// Announce transactions hash to a batch of peers
peersCount := uint(math.Sqrt(float64(h.peers.len())))
peers := h.peers.headPeers(peersCount)
for _, peer := range peers {
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction reannounce", "txs", len(txs),
"announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes)))
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand All @@ -692,3 +723,16 @@ func (h *handler) txBroadcastLoop() {
}
}
}

// txReannounceLoop announces local pending transactions to connected peers again.
func (h *handler) txReannounceLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.reannoTxsCh:
h.ReannounceTransactions(event.Txs)
case <-h.reannoTxsSub.Err():
return
}
}
}
53 changes: 53 additions & 0 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
}
}

// Tests that local pending transactions get propagated to peers.
func TestTransactionPendingReannounce(t *testing.T) {
t.Parallel()

// Create a source handler to announce transactions from and a sink handler
// to receive them.
source := newTestHandler()
defer source.close()

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(sink.handler), peer)
})

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

txs[nonce] = tx
}
source.txpool.ReannouceTransactions(txs)

for arrived := 0; arrived < len(txs); {
select {
case event := <-txCh:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
}
}
}

// Tests that post eth protocol handshake, clients perform a mutual checkpoint
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
Expand Down
23 changes: 21 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ var (
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions

txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed // Notification feed to allow waiting for inclusion
reannoTxFeed event.Feed // Notification feed to trigger reannouce
lock sync.RWMutex // Protects the transaction pool
}

// newTestTxPool creates a mock transaction pool.
Expand Down Expand Up @@ -91,6 +92,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
return make([]error, len(txs))
}

// ReannouceTransactions announce the transactions to some peers.
func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()

for _, tx := range txs {
p.pool[tx.Hash()] = tx
}
p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs})
return make([]error, len(txs))
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transactions {
p.lock.RLock()
Expand All @@ -113,6 +126,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
return p.reannoTxFeed.Subscribe(ch)
}

// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
Expand Down
Loading

0 comments on commit f257d62

Please sign in to comment.