Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TxPool] Broadcasting stuck transaction #3666

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions core/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package core

import (
prom "github.com/harmony-one/harmony/api/service/prometheus"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
prom.PromRegistry().MustRegister(
receivedTxsCounter,
stuckTxsCounter,
invalidTxsCounterVec,
knownTxsCounter,
lowNonceTxCounter,
pendingTxGauge,
queuedTxGauge,
)
}

var (
receivedTxsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "received",
Help: "number of transactions received",
},
)

stuckTxsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "stuck",
Help: "number of transactions observed to be stuck and broadcast again",
},
)

invalidTxsCounterVec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "invalid",
Help: "transactions failed validation",
},
[]string{"err"},
)

knownTxsCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "known",
Help: "number of known transaction received",
},
)

lowNonceTxCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "low_nonce",
Help: "number of transactions removed because of low nonce (including processed transactions)",
},
)

pendingTxGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "pending",
Help: "number of executable transactions",
},
)

queuedTxGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "hmy",
Subsystem: "txPool",
Name: "queued",
Help: "number of queued non-executable transactions",
},
)
)
31 changes: 26 additions & 5 deletions core/tx_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,34 @@ func (m *txSortedMap) Len() int {
func (m *txSortedMap) Flatten() types.PoolTransactions {
// If the sorting was not cached yet, create and cache it
if m.cache == nil {
m.cache = make(types.PoolTransactions, 0, len(m.items))
for _, tx := range m.items {
m.cache = append(m.cache, tx)
}
sort.Sort(types.PoolTxByNonce(m.cache))
m.cache = m.sortedTxs()
}
// Copy the cache to prevent accidental modifications
txs := make(types.PoolTransactions, len(m.cache))
copy(txs, m.cache)
return txs
}

// Peek return the transaction with the lowest nonce.
func (m *txSortedMap) Peek() types.PoolTransaction {
if m.cache == nil {
m.cache = m.sortedTxs()
}
if len(m.cache) == 0 {
return nil
}
return m.cache[0]
}

func (m *txSortedMap) sortedTxs() types.PoolTransactions {
cache := make(types.PoolTransactions, 0, len(m.items))
for _, tx := range m.items {
cache = append(cache, tx)
}
sort.Sort(types.PoolTxByNonce(cache))
return cache
}

const stakingTxCheckThreshold = 10 // check staking transaction validation every 10 blocks

// txList is a "list" of transactions belonging to an account, sorted by account
Expand Down Expand Up @@ -444,6 +460,11 @@ func (l *txList) Flatten() types.PoolTransactions {
return l.txs.Flatten()
}

// Peek return a transaction with the lowest nonce.
func (l *txList) Peek() types.PoolTransaction {
return l.txs.Peek()
}

// priceHeap is a heap.Interface implementation over transactions for retrieving
// price-sorted transactions to discard when the pool fills up.
type priceHeap []types.PoolTransaction
Expand Down
128 changes: 121 additions & 7 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,20 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"github.com/harmony-one/harmony/internal/params"
"github.com/fwojciec/clock"
"github.com/pkg/errors"

"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
hmyCommon "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
Expand Down Expand Up @@ -98,13 +101,37 @@ var (

// ErrBlacklistTo is returned if a transaction's to/destination address is blacklisted
ErrBlacklistTo = errors.New("`to` address of transaction in blacklist")

// ErrStaking is returned if a staking transaction fails validation
ErrStaking = errors.New("staking transaction validation failed")
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
)

const (
// Check and broadcast "stuck" executable transactions.
// A transaction is defined as stuck:
// 1. Transaction is executable (exist in txPool.pending)
// 2. exist in pending for at least txStuckThreshold
// Broadcast check is executed with an interval randomly chosen from
// broadcastMaxInterval and broadcastMinInterval. This is to prevent
// several nodes broadcast the same transaction at the same time in
// pub-sub.
// TODO: A better way to prevent broadcasting the same transaction
// multiple times is to broadcast the pending `txHash` periodically,
// and the leader is responsible for collecting the transaction data
// in a p2p stream. This will cut the cost of broadcasting transactions,
// but shall be extremely careful about the security from leader's
// perspective.
broadcastMaxInterval = 5 * time.Minute
broadcastMinInterval = 1 * time.Minute
txStuckThreshold = 1 * time.Minute
)

// TODO: fully replace these counters
var (
// Metrics for the pending pool
pendingDiscardCounter = metrics.NewRegisteredCounter("txpool/pending/discard", nil)
Expand Down Expand Up @@ -159,7 +186,8 @@ type TxPoolConfig struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Broadcast func(types.PoolTransactions) // broadcast invalid transaction
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued

Blacklist map[common.Address]struct{} // Set of accounts that cannot be a part of any transaction
}
Expand Down Expand Up @@ -212,6 +240,9 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
utils.Logger().Warn().Msg("Sanitizing nil blacklist set")
conf.Blacklist = DefaultTxPoolConfig.Blacklist
}
if conf.Broadcast == nil {
utils.Logger().Warn().Msg("broadcast function is nil, unable to broadcast stuck transaction")
}

return conf
}
Expand All @@ -238,6 +269,8 @@ type TxPool struct {
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas uint64 // Current gas limit for transaction caps

broadcast func(tx types.PoolTransactions) // broadcast timed out executable transactions

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

Expand Down Expand Up @@ -275,6 +308,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig,
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
txErrorSink: txErrorSink,
broadcast: config.Broadcast,
}
pool.locals = newAccountSet(chainconfig.ChainID)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -323,6 +357,9 @@ func (pool *TxPool) loop() {
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()

broadcast := clock.NewRandomTicker(broadcastMinInterval, broadcastMaxInterval)
defer broadcast.Stop()

// Track the previous head headers for transaction reorgs
head := pool.chain.CurrentBlock()

Expand All @@ -341,6 +378,11 @@ func (pool *TxPool) loop() {
}
pool.reset(head.Header(), ev.Block.Header())
head = ev.Block

pendings, queued := pool.stats()
pendingTxGauge.Set(float64(pendings))
queuedTxGauge.Set(float64(queued))

pool.mu.Unlock()
}
// Be unsubscribed due to system stopped
Expand Down Expand Up @@ -394,6 +436,23 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()
}

// broadcast stuck transactions from pendings
case <-broadcast.C:
pool.mu.Lock()
stuckTxs := pool.stuckExecutables()
for _, tx := range stuckTxs {
addr, _ := tx.SenderAddress()
pool.beats[addr] = time.Now()
}
pool.mu.Unlock()

if pool.broadcast != nil && len(stuckTxs) != 0 {
utils.Logger().Info().Int("Count", len(stuckTxs)).
Msg("Broadcasting stuck transaction")
stuckTxsCounter.Add(float64(len(stuckTxs)))
go pool.broadcast(stuckTxs)
}
}
}
}
Expand Down Expand Up @@ -756,7 +815,10 @@ func (pool *TxPool) validateTx(tx types.PoolTransaction, local bool) error {
}
// Do more checks if it is a staking transaction
if isStakingTx {
return pool.validateStakingTx(stakingTx)
if err := pool.validateStakingTx(stakingTx); err != nil {
return errors.WithMessagef(ErrStaking, "staking validation failed: %v", err)
}
return nil
}
return nil
}
Expand Down Expand Up @@ -915,19 +977,25 @@ func (pool *TxPool) pendingEpoch() *big.Int {
// the pool due to pricing constraints.
func (pool *TxPool) add(tx types.PoolTransaction, local bool) (bool, error) {
logger := utils.Logger().With().Stack().Logger()

receivedTxsCounter.Inc()
// If the transaction is in the error sink, remove it as it may succeed
if pool.txErrorSink.Contains(tx.Hash().String()) {
pool.txErrorSink.Remove(tx)
}
// If the transaction is already known, discard it
// If the transaction is already known, refresh the beats timestamp, and
// discard it.
hash := tx.Hash()
if pool.all.Get(hash) != nil {
if cachedTx := pool.all.Get(hash); cachedTx != nil {
knownTxsCounter.Inc()
pool.handleKnownTx(cachedTx)
logger.Info().Str("hash", hash.Hex()).Msg("Discarding already known transaction")
return false, errors.WithMessagef(ErrKnownTransaction, "transaction hash %x", hash)
}
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
logger.Warn().Err(err).Str("hash", hash.Hex()).Msg("Discarding invalid transaction")
invalidTxsCounterVec.With(prometheus.Labels{"err": errors.Cause(err).Error()}).Inc()
invalidTxCounter.Inc(1)
return false, err
}
Expand Down Expand Up @@ -1119,6 +1187,24 @@ func (pool *TxPool) promoteTx(addr common.Address, tx types.PoolTransaction) boo
return true
}

// handleKnownTx update the beats of the sender if the transaction is at the head of the
// pending address's txList.
func (pool *TxPool) handleKnownTx(tx types.PoolTransaction) {
sender, _ := tx.SenderAddress()
txList, ok := pool.pending[sender]
if !ok || txList == nil {
return
}
// only update sender's beat when tx is at the head
if headTx := txList.Peek(); headTx.Hash() == tx.Hash() {
utils.Logger().Info().Str("txHash", tx.Hash().String()).
Str("address", sender.String()).
Msg("updated beats at txPool")
pool.beats[sender] = time.Now()
}
return
}

// AddLocal enqueues a single transaction into the pool if it is valid, marking
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.
Expand Down Expand Up @@ -1307,7 +1393,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
}
// Drop all transactions that are deemed too old (low nonce)
nonce := pool.currentState.GetNonce(addr)
for _, tx := range list.Forward(nonce) {
forwards := list.Forward(nonce)
lowNonceTxCounter.Add(float64(len(forwards)))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
Expand Down Expand Up @@ -1481,7 +1569,9 @@ func (pool *TxPool) demoteUnexecutables(bn uint64) {
nonce := pool.currentState.GetNonce(addr)

// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) {
forwards := list.Forward(nonce)
lowNonceTxCounter.Add(float64(len(forwards)))
for _, tx := range forwards {
hash := tx.Hash()
pool.all.Remove(hash)
pool.priced.Removed()
Expand Down Expand Up @@ -1524,6 +1614,30 @@ func (pool *TxPool) demoteUnexecutables(bn uint64) {
}
}

// stuckExecutables get the stuck executable transactions.
// A stuck transaction is defined as a transactions that have
// exist in TxPool.pending for duration of txStuckThreshold.
// Only the first transaction of each account is returned
func (pool *TxPool) stuckExecutables() types.PoolTransactions {
var stuckTxs types.PoolTransactions

for addr, list := range pool.pending {
head := list.Peek()
if head == nil {
continue
}
if beat, ok := pool.beats[addr]; ok && time.Since(beat) > txStuckThreshold {
// transaction is likely to be stuck, validate and
// add to stuckTxs
if err := pool.validateTx(head, false); err != nil {
continue
}
stuckTxs = append(stuckTxs, head)
}
}
return stuckTxs
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
Loading