Skip to content

Commit

Permalink
txpool: enhance some logs and metrics on broadcasting and annoucing (b…
Browse files Browse the repository at this point in the history
…nb-chain#41)

* txpool: enhance some logs and metrics on broadcasting and annoucing

* add transactions hashes detail in warning logs

* feature: add tx send logs

* enhance logs for announcement

* enhance logs for announcement

* fix metric txpool/valid

* fix comment

---------

Co-authored-by: andyzhang2023 <[email protected]>
Co-authored-by: redhdx <[email protected]>
Co-authored-by: Owen <[email protected]>
  • Loading branch information
4 people authored Jan 5, 2024
1 parent 7a5b85e commit 69afb85
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 1 deletion.
3 changes: 2 additions & 1 deletion core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,10 +1113,10 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error,
replaced, err := pool.add(tx, local)
errs[i] = err
if err == nil && !replaced {
validTxMeter.Mark(1)
dirty.addTx(tx)
}
}
validTxMeter.Mark(int64(len(dirty.accounts)))
return errs, dirty
}

Expand Down Expand Up @@ -1539,6 +1539,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
log.Trace("Promoted queued transaction", "hash", hash)
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
Expand Down
24 changes: 24 additions & 0 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
mrand "math/rand"
"sort"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -237,6 +238,8 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
log.Info("announced transaction is underpriced", "hash", hash.String())

default:
unknowns = append(unknowns, hash)
}
Expand Down Expand Up @@ -323,6 +326,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)

default:
otherreject++
log.Warn("Peer's transaction rejected", "peer", peer, "txHash", batch[j].Hash().String(), "err", err.Error())
}
added = append(added, batch[j].Hash())
}
Expand Down Expand Up @@ -389,11 +393,13 @@ func (f *TxFetcher) loop() {
// check. Should be fine as the limit is in the thousands and the
// request size in the hundreds.
txAnnounceDOSMeter.Mark(int64(len(ann.hashes)))
log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes))
break
}
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))
log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes))
ann.hashes = ann.hashes[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
Expand Down Expand Up @@ -505,6 +511,7 @@ func (f *TxFetcher) loop() {
for peer, req := range f.requests {
if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout {
txRequestTimeoutMeter.Mark(int64(len(req.hashes)))
log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes))

// Reschedule all the not-yet-delivered fetches to alternate peers
for _, hash := range req.hashes {
Expand Down Expand Up @@ -824,6 +831,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
// failure (e.g. peer disconnected), reschedule the hashes.
if err := f.fetchTxs(peer, hashes); err != nil {
txRequestFailMeter.Mark(int64(len(hashes)))
log.Info("announced transaction request failed", "hashes", joinHashes(hashes), "num", len(hashes))
f.Drop(peer)
}
})
Expand Down Expand Up @@ -916,3 +924,19 @@ func rotateHashes(slice []common.Hash, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

// joinHashes concat hashes into string, for debugging logs; 1024 hashes at most, to avoid
// too much cost of logging
func joinHashes(hashes []common.Hash) string {
num := len(hashes)
if num > 1024 {
num = 1024
}
strs := make([]string, num)
for i, h := range hashes {
if i < num {
strs[i] = h.String()
}
}
return strings.Join(strs, ",")
}
2 changes: 2 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,10 +662,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
numDirect := int(math.Sqrt(float64(len(peers))))
for _, peer := range peers[:numDirect] {
txset[peer] = append(txset[peer], tx.Hash())
log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash())
}
// For the remaining peers, send announcement only
for _, peer := range peers[numDirect:] {
annos[peer] = append(annos[peer], tx.Hash())
log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash())
}
}
for peer, hashes := range txset {
Expand Down
21 changes: 21 additions & 0 deletions eth/protocols/eth/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
)

var (
txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil)
txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil)
)

const (
Expand Down Expand Up @@ -62,6 +68,14 @@ func (p *Peer) broadcastBlocks() {
}
}

// safeGetPeerIP
var safeGetPeerIP = func(p *Peer) string {
if p.Node() != nil && p.Node().IP() != nil {
return p.Node().IP().String()
}
return "UNKNOWN"
}

func collectHashes(txs []*types.Transaction) []common.Hash {
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
Expand Down Expand Up @@ -111,6 +125,7 @@ func (p *Peer) broadcastTransactions() {
done = make(chan struct{})
gopool.Submit(func() {
if err := p.SendTransactions(txs); err != nil {
p.Log().Warn("Broadcast transactions failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(txs), "hashes", concat(collectHashes(txs)), "err", err.Error())
fail <- err
return
}
Expand All @@ -130,6 +145,8 @@ func (p *Peer) broadcastTransactions() {
queue = append(queue, hashes...)
if len(queue) > maxQueuedTxs {
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs]))
txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs))
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
}

Expand Down Expand Up @@ -183,11 +200,13 @@ func (p *Peer) announceTransactions() {
gopool.Submit(func() {
if p.version >= ETH68 {
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
p.Log().Warn("Announce hashes68 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error())
fail <- err
return
}
} else {
if err := p.sendPooledTransactionHashes66(pending); err != nil {
p.Log().Warn("Announce hashes66 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error())
fail <- err
return
}
Expand All @@ -207,6 +226,8 @@ func (p *Peer) announceTransactions() {
// New batch of transactions to be broadcast, queue them (with cap)
queue = append(queue, hashes...)
if len(queue) > maxQueuedTxAnns {
p.Log().Warn("Announce hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxAnns, "hashes", concat(queue[:len(queue)-maxQueuedTxAnns]))
txAnnounceAbandonMeter.Mark(int64(len(queue) - maxQueuedTxAnns))
// Fancy copy and resize to ensure buffer doesn't grow indefinitely
queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
}
Expand Down

0 comments on commit 69afb85

Please sign in to comment.