diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 9b38fbb25..f10a3b29b 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -21,6 +21,7 @@ import ( "fmt" "math/big" "runtime" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -29,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -40,6 +42,8 @@ var ( // enqueueTx is a channel to enqueue transactions in parallel. // It is used to improve the performance of transaction enqueued. var enqueueTx = make(chan func(), TxQueueSize) +var parallelCounter int32 = 0 +var parallelCounterGuage = metrics.NewRegisteredGauge("p2p/enqueue/parallel", nil) func init() { log.Info("P2P euqneue parallel thread number", "threadNum", TxQueueSize) @@ -47,7 +51,17 @@ func init() { for i := 0; i < TxQueueSize; i++ { go func() { for enqueue := range enqueueTx { + atomic.AddInt32(¶llelCounter, 1) enqueue() + atomic.AddInt32(¶llelCounter, -1) + } + }() + } + if metrics.EnabledExpensive { + go func() { + for { + time.Sleep(1 * time.Second) + parallelCounterGuage.Update(int64(atomic.LoadInt32(¶llelCounter))) } }() }