Skip to content

Commit

Permalink
feat(dot/network) add network_is_major_syncing metric (ChainSafe#1697)
Browse files Browse the repository at this point in the history
* chore: transform goal into atomic

* add tests

* remove debug from polkadot config

* chore: add comment

* chore: remove comments

* change var name

Co-authored-by: Timothy Wu <[email protected]>

* update var name

* chore: add metrics port

* chore: add clean up to close dbs while testing

Co-authored-by: Timothy Wu <[email protected]>
  • Loading branch information
EclesioMeloJunior and timwu20 committed Dec 6, 2021
1 parent b31bba5 commit d0a6341
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 17 deletions.
1 change: 1 addition & 0 deletions chain/polkadot/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[global]
basepath = "~/.gossamer/polkadot"
log = "info"
metrics-port = 9876

[log]
core = ""
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func PolkadotConfig() *Config {
LogLvl: polkadot.DefaultLvl,
RetainBlocks: gssmr.DefaultRetainBlocks,
Pruning: pruner.Mode(gssmr.DefaultPruningMode),
MetricsPort: gssmr.DefaultMetricsPort,
},
Log: LogConfig{
CoreLvl: polkadot.DefaultLvl,
Expand Down
4 changes: 1 addition & 3 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err
return nil
}

go func() {
s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer)
}()
go s.syncQueue.handleBlockAnnounceHandshake(bhs.BestBlockNumber, peer)

return nil
}
Expand Down
16 changes: 16 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
transactionsID = "/transactions/1"

maxMessageSize = 1024 * 63 // 63kb for now

gssmrIsMajorSyncMetric = "gossamer/network/is_major_syncing"
)

var (
Expand Down Expand Up @@ -692,6 +694,20 @@ func (s *Service) NodeRoles() byte {
return s.cfg.Roles
}

// CollectGauge will be used to collect coutable metrics from network service
func (s *Service) CollectGauge() map[string]int64 {
var isSynced int64
if !s.syncer.IsSynced() {
isSynced = 1
} else {
isSynced = 0
}

return map[string]int64{
gssmrIsMajorSyncMetric: isSynced,
}
}

// HighestBlock returns the highest known block number
func (s *Service) HighestBlock() int64 {
return s.syncQueue.goal
Expand Down
18 changes: 18 additions & 0 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,3 +344,21 @@ func TestHandleConn(t *testing.T) {
require.True(t, ok)
require.Equal(t, 1, aScore)
}

func TestSerivceIsMajorSyncMetrics(t *testing.T) {
mocksyncer := new(MockSyncer)

node := &Service{
syncer: mocksyncer,
}

mocksyncer.On("IsSynced").Return(false).Once()
m := node.CollectGauge()

require.Equal(t, int64(1), m[gssmrIsMajorSyncMetric])

mocksyncer.On("IsSynced").Return(true).Once()
m = node.CollectGauge()

require.Equal(t, int64(0), m[gssmrIsMajorSyncMetric])
}
50 changes: 36 additions & 14 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ChainSafe/gossamer/dot/types"
Expand Down Expand Up @@ -194,6 +195,7 @@ func (q *syncQueue) syncAtHead() {

t := time.NewTicker(q.slotDuration * 2)
defer t.Stop()

for {
select {
// sleep for average block time TODO: make this configurable from slot duration
Expand All @@ -207,10 +209,13 @@ func (q *syncQueue) syncAtHead() {
continue
}

goal := atomic.LoadInt64(&q.goal)

// we aren't at the head yet, sleep
if curr.Number.Int64() < q.goal && curr.Number.Cmp(prev.Number) > 0 {
if curr.Number.Int64() < goal && curr.Number.Cmp(prev.Number) > 0 {
prev = curr
q.s.noGossip = true
q.s.syncer.SetSyncing(true)
continue
}

Expand Down Expand Up @@ -247,10 +252,12 @@ func (q *syncQueue) handleResponseQueue() {
}

q.responseLock.Lock()
goal := atomic.LoadInt64(&q.goal)

if len(q.responses) == 0 {
q.responseLock.Unlock()

if len(q.requestCh) == 0 && head.Int64() < q.goal {
if len(q.requestCh) == 0 && head.Int64() < goal {
q.pushRequest(uint64(head.Int64()+1), blockRequestBufferSize, "")
}
continue
Expand Down Expand Up @@ -328,6 +335,9 @@ func (q *syncQueue) prunePeers() {
}

func (q *syncQueue) benchmark() {
t := time.NewTimer(time.Second * 5)
defer t.Stop()

for {
if q.ctx.Err() != nil {
return
Expand All @@ -338,19 +348,27 @@ func (q *syncQueue) benchmark() {
continue
}

if before.Number.Int64() >= q.goal {
goal := atomic.LoadInt64(&q.goal)

if before.Number.Int64() >= goal {
finalised, err := q.s.blockState.GetFinalisedHeader(0, 0) //nolint
if err != nil {
continue
}

logger.Info("💤 node waiting", "peer count", len(q.s.host.peers()), "head", before.Number, "finalised", finalised.Number)
time.Sleep(time.Second * 5)

// reset the counter and then wait 5 seconds
t.Reset(time.Second * 5)
<-t.C

continue
}

q.benchmarker.begin(before.Number.Uint64())
time.Sleep(time.Second * 5)

t.Reset(time.Second * 5)
<-t.C

after, err := q.s.blockState.BestBlockHeader()
if err != nil {
Expand All @@ -361,7 +379,7 @@ func (q *syncQueue) benchmark() {

logger.Info("🚣 currently syncing",
"peer count", len(q.s.host.peers()),
"goal", q.goal,
"goal", goal,
"average blocks/second", q.benchmarker.mostRecentAverage(),
"overall average", q.benchmarker.average(),
)
Expand Down Expand Up @@ -418,11 +436,13 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) {
return
}

if q.goal < best.Int64() {
q.goal = best.Int64()
goal := atomic.LoadInt64(&q.goal)
if goal < best.Int64() {
atomic.StoreInt64(&q.goal, best.Int64())
}

if q.goal-int64(start) < int64(blockRequestSize) {
goal = atomic.LoadInt64(&q.goal)
if goal-int64(start) < int64(blockRequestSize) {
start := best.Int64() + 1
req := createBlockRequest(start, 0)

Expand All @@ -443,7 +463,7 @@ func (q *syncQueue) pushRequest(start uint64, numRequests int, to peer.ID) {
start = start - m + 1

for i := 0; i < numRequests; i++ {
if start > uint64(q.goal) {
if start > uint64(goal) {
return
}

Expand Down Expand Up @@ -833,11 +853,12 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID)
return
}

if bestNum.Int64() >= int64(blockNum) || q.goal >= int64(blockNum) {
goal := atomic.LoadInt64(&q.goal)
if bestNum.Int64() >= int64(blockNum) || goal >= int64(blockNum) {
return
}

q.goal = int64(blockNum)
atomic.StoreInt64(&q.goal, int64(blockNum))
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

Expand All @@ -856,8 +877,9 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

if header.Number.Int64() > q.goal {
q.goal = header.Number.Int64()
goal := atomic.LoadInt64(&q.goal)
if header.Number.Int64() > goal {
atomic.StoreInt64(&q.goal, header.Number.Int64())
}

req := createBlockRequestWithHash(header.Hash(), blockRequestSize)
Expand Down
1 change: 1 addition & 0 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
c := metrics.NewCollector(context.Background())
c.AddGauge(fg)
c.AddGauge(stateSrvc)
c.AddGauge(networkSrvc)

go c.Start()

Expand Down
2 changes: 2 additions & 0 deletions lib/grandpa/grandpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func newTestState(t *testing.T) *state.Service {
db, err := utils.SetupDatabase(testDatadirPath, true)
require.NoError(t, err)

t.Cleanup(func() { db.Close() })

gen, genTrie, _ := genesis.NewTestGenesisWithTrieAndHeader(t)
block, err := state.NewBlockStateFromGenesis(db, testHeader)
require.NoError(t, err)
Expand Down

0 comments on commit d0a6341

Please sign in to comment.