Skip to content

Commit

Permalink
Merge branch 'debug/reannounce-with-body' into v0.2.1-txpool-reannoun…
Browse files Browse the repository at this point in the history
…ce-freq
  • Loading branch information
andyzhang2023 committed Dec 17, 2023
2 parents 866a683 + e86ab2e commit e968f1e
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
64 changes: 64 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,70 @@ func (p *Peer) broadcastTransactions() {
}
}

func (p *Peer) announceWithBody() {
var (
queue []common.Hash // Queue of hashes to broadcast as full transactions
done chan struct{} // Non-nil if background broadcaster is running
fail = make(chan error, 1) // Channel used to receive network error
failed bool // Flag whether a send failed, discard everything onward
)
for {
// If there's no in-flight broadcast running, check if a new one is needed
if done == nil && len(queue) > 0 {
// Pile transaction until we reach our allowed network limit
var (
hashesCount uint64
txs []*types.Transaction
size common.StorageSize
)
for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
if tx := p.txpool.Get(queue[i]); tx != nil {
txs = append(txs, tx)
size += common.StorageSize(tx.Size())
}
hashesCount++
}
queue = queue[:copy(queue, queue[hashesCount:])]

// If there's anything available to transfer, fire up an async writer
if len(txs) > 0 {
done = make(chan struct{})
gopool.Submit(func() {
if err := p.SendTransactions(txs); err != nil {
fail <- err
return
}
close(done)
p.Log().Trace("Sent transactions", "count", len(txs))
})
}
}
// Transfer goroutine may or may not have been started, listen for events
select {
case hashes := <-p.txAnnounce:
// If the connection failed, discard all transaction events
if failed {
continue
}
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
if len(queue) > maxQueuedTxs {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
}

case <-done:
done = nil

case <-fail:
failed = true

case <-p.term:
return
}
}
}

// announceTransactions is a write loop that schedules transaction broadcasts
// to the remote peer. The goal is to have an async writer that does not lock up
// node internals and at the same time rate limits queued data.
Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/eth/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
// Start up all the broadcasters
go peer.broadcastBlocks()
go peer.broadcastTransactions()
go peer.announceTransactions()
go peer.announceWithBody()
go peer.dispatcher()

return peer
Expand Down

0 comments on commit e968f1e

Please sign in to comment.