Skip to content

Commit

Permalink
feat(telemetry): send txpool.import telemetry msg (ChainSafe#1966)
Browse files Browse the repository at this point in the history
From this commit, a telemetry message of type `txpool.import` will be
sent when a new transaction is imported in the transaction pool.

Added tests for `txpool.import` and improved TestHandler_SendMulti test.
  • Loading branch information
kishansagathiya authored and timwu20 committed Dec 6, 2021
1 parent 37d2807 commit 9f9c213
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 66 deletions.
2 changes: 1 addition & 1 deletion dot/state/block_finalisation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (bs *BlockState) SetFinalisedHash(hash common.Hash, round, setID uint64) er
),
)
if err != nil {
return fmt.Errorf("could not send 'notify.finalized' telemetry message, error: %s", err)
logger.Debugf("could not send 'notify.finalized' telemetry message, error: %s", err)
}

bs.lastFinalised = hash
Expand Down
13 changes: 12 additions & 1 deletion dot/state/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package state
import (
"sync"

"github.com/ChainSafe/gossamer/dot/telemetry"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
"github.com/ChainSafe/gossamer/lib/transaction"
Expand Down Expand Up @@ -68,7 +70,16 @@ func (s *TransactionState) RemoveExtrinsicFromPool(ext types.Extrinsic) {
// AddToPool adds a transaction to the pool
func (s *TransactionState) AddToPool(vt *transaction.ValidTransaction) common.Hash {
s.notifyStatus(vt.Extrinsic, transaction.Future)
return s.pool.Insert(vt)

hash := s.pool.Insert(vt)

if err := telemetry.GetInstance().SendMessage(
telemetry.NewTxpoolImportTM(uint(s.queue.Len()), uint(s.pool.Len())),
); err != nil {
logger.Debugf("problem sending txpool.import telemetry message, error: %s", err)
}

return hash
}

// GetStatusNotifierChannel creates and returns a status notifier channel.
Expand Down
5 changes: 2 additions & 3 deletions dot/telemetry/prepared_block_for_proposing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ func NewPreparedBlockForProposingTM(hash common.Hash, number string) Message {
return &preparedBlockForProposingTM{
Hash: hash,
Number: number,
Msg: "prepared_block_for_proposing",
}
}

func (tm *preparedBlockForProposingTM) messageType() string {
return tm.Msg
func (preparedBlockForProposingTM) messageType() string {
return preparedBlockForProposingMsg
}
12 changes: 7 additions & 5 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (

// telemetry message types
const (
notifyFinalizedMsg = "notify.finalized"
blockImportMsg = "block.import"
systemNetworkStateMsg = "system.network_state"
systemConnectedMsg = "system.connected"
systemIntervalMsg = "system.interval"
systemConnectedMsg = "system.connected"
systemIntervalMsg = "system.interval"
systemNetworkStateMsg = "system.network_state"
blockImportMsg = "block.import"
notifyFinalizedMsg = "notify.finalized"
txPoolImportMsg = "txpool.import"
preparedBlockForProposingMsg = "prepared_block_for_proposing"
)

type telemetryConnection struct {
Expand Down
103 changes: 47 additions & 56 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,75 +42,66 @@ func TestMain(m *testing.M) {
}

func TestHandler_SendMulti(t *testing.T) {
var wg sync.WaitGroup
wg.Add(6)

resultCh = make(chan []byte)

go func() {
genesisHash := common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")

GetInstance().SendMessage(NewSystemConnectedTM(false, "chain", &genesisHash,
"systemName", "nodeName", "netID", "startTime", "0.1"))

wg.Done()
}()

go func() {
bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync"))

wg.Done()
}()

go func() {
GetInstance().SendMessage(NewBandwidthTM(2, 3, 1))

wg.Done()
}()

go func() {
bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
finalisedHash := common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2")
GetInstance().SendMessage(NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash,
big.NewInt(32256), big.NewInt(0), big.NewInt(1234)))

wg.Done()
}()

go func() {
bestHash := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")
GetInstance().SendMessage(NewNotifyFinalizedTM(bestHash, "32375"))
expected := [][]byte{
[]byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`),
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`),
[]byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`),
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`), //nolint
[]byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`),
[]byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`),
[]byte(`{"future":2,"msg":"txpool.import","ready":1,"ts":`),
}

wg.Done()
}()
messages := []Message{
NewBandwidthTM(2, 3, 1),
NewTxpoolImportTM(1, 2),

func(genesisHash common.Hash) Message {
return NewSystemConnectedTM(false, "chain", &genesisHash,
"systemName", "nodeName", "netID", "startTime", "0.1")
}(common.MustHexToHash("0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3")),

func(bh common.Hash) Message {
return NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")
}(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6")),

func(bestHash, finalisedHash common.Hash) Message {
return NewBlockIntervalTM(&bestHash, big.NewInt(32375), &finalisedHash,
big.NewInt(32256), big.NewInt(0), big.NewInt(1234))
}(
common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"),
common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"),
),

NewNotifyFinalizedTM(common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), "32375"),
NewPreparedBlockForProposingTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1"),
}

go func() {
bestHash := common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c")
GetInstance().SendMessage(NewPreparedBlockForProposingTM(bestHash, "1"))
resultCh = make(chan []byte)

wg.Done()
}()
var wg sync.WaitGroup
for _, message := range messages {
wg.Add(1)
go func(msg Message) {
GetInstance().SendMessage(msg)
wg.Done()
}(message)
}

wg.Wait()

expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`)
expected2 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":2,"msg":"block.import","origin":"NetworkInitialSync","ts":`)
expected3 := []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`)
expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint
expected5 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`)
expected6 := []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`)

expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected6}

var actual [][]byte
for data := range resultCh {
actual = append(actual, data)
if len(actual) == 6 {
if len(actual) == len(expected) {
break
}
}

sort.Slice(expected, func(i, j int) bool {
return bytes.Compare(expected[i], expected[j]) < 0
})

sort.Slice(actual, func(i, j int) bool {
return bytes.Compare(actual[i], actual[j]) < 0
})
Expand Down
36 changes: 36 additions & 0 deletions dot/telemetry/txpool_import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package telemetry

// txpoolImportTM holds `txpool.import` telemetry message, which is supposed to be
// sent when a new transaction gets imported in the transaction pool.
type txpoolImportTM struct {
Ready uint `json:"ready"`
Future uint `json:"future"`
}

// NewTxpoolImportTM creates a new txpoolImportTM struct
func NewTxpoolImportTM(ready, future uint) Message {
return &txpoolImportTM{
Ready: ready,
Future: future,
}
}

func (txpoolImportTM) messageType() string {
return txPoolImportMsg
}

0 comments on commit 9f9c213

Please sign in to comment.