Skip to content

Commit

Permalink
Add metrics and revamp logic (#206)
Browse files Browse the repository at this point in the history
* Add metrics and revamp logic

Declare prometheus metrics in gossipsub

Add missing metric labels

Fix metrics types

Add onReportValidation

* Compile tests

* Fix tests

* Customize await policy

* Run prettier on files

* Relax heartbeat test condition

* Re-add util package for browser testing

* Remove unused dependencies

* Add getMeshPeers

* Fix rpc metric name

* Improve metrics

* More gossip promise metrics

* Fix metrics typo

* Clarify metrics names

* Track metric of score deltas

* Fix score metrics

* Add buckets for gossipsub_score_cache_delta

* Review PeerScore logic

* Dump peer stats

* Fix PeerStats constructor

* Enable strict typescript checks

* Fix compute-score logic

* f

* Fix peer stats types

* Reenable go-gossipsub tests (#201)

* Add checkReceivedSubscriptions

* Add checkReceivedSubscriptions to 'test gossipsub multihops'

* Add checkReceivedSubscriptions to 'test gossipsub tree topology'

* test gossipsub star topology with signed peer records

* Fix 'test gossipsub direct peers'

* Fix 'test gossipsub flood publish'

* Fix 'test gossipsub star topology with signed peer records'

* 'direct peers' test: wait for subscriptions event again

* 'direct peer': await for 2 peer:connect events

* 'direct peers': add missing Promise.all

* Expect topic peers to contain peer id

* Fix test types

* Set as connected in addPeer()

* Prune publishedMessageIds

* Fix markFirstMessageDelivery typo

* Same logic in scoreMetrics

* More metrics for p3 and p7 (#213)

* Add behaviourPenalty metrics

* Add duplicateMsgDelivery metric

* Observe topic and peer in duplicateMsgDelivery topic

* Remove peerId from duplicateMsgDelivery metric

* Use min meshMessageDeliveriesWindow

* Use topic label for duplicateMsgDelivery metric

* Record duplicateMsgDelivery in all cases

* Forward messages to floodsub peers (#214)

* Forward messages to floodsub peers

* Add comments

Co-authored-by: Lion - dapplion <[email protected]>

* Add missing msgId in validateReceivedMessage (#215)

* Fix 'test gossipsub opportunistic grafting'

* Fix tests in browser

* Fix tests suspended issue in browsers

* Fix 'test gossipsub fanout expiry' go-gossipsub test

* GossipsubIWantFollowupTime as a param (#216)

* Increase resolution of delay metrics (#217)

* Fix minMeshMessageDeliveriesWindow (#218)

* Fix minMeshMessageDeliveriesWindow

* Fix gossipsubIWantFollowupTime for metric

* Fix tracer prune()

* Change to maxMeshMessageDeliveriesWindow

* Use maxMeshMessageDeliveriesWindowSec for metric

* Rename gossipsubIWantFollowupMs option, revert/correct tracer.prune() logic

* Reset behaviourPenalty histogram to track current count

* publish(): return number of sent peers

* Add getScore()

* makePrune: update PeerStat so that we don't apply p3 penalty

* Remove redundant this.score.prune() in heartbeat

* Track iasked cache size per heartbeat and remove TODOs

* validateReceivedMessage: check duplicate message first (#223)

Co-authored-by: tuyennhv <[email protected]>
  • Loading branch information
dapplion and twoeths authored Apr 5, 2022
1 parent 25c9aa1 commit baf84f8
Show file tree
Hide file tree
Showing 46 changed files with 3,755 additions and 1,334 deletions.
5 changes: 4 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ module.exports = {
}
],
'@typescript-eslint/indent': 'off', // This is the job of StandardJS, they are competing rules so we turn off the Typescript one.
'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords.
'@typescript-eslint/no-non-null-assertion': 'off',
'@typescript-eslint/member-delimiter-style': 'off',
'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords.
"@typescript-eslint/strict-boolean-expressions": [
"error",
{
Expand All @@ -42,6 +43,8 @@ module.exports = {
'no-mixed-operators': 'off',
'space-before-function-paren': 'off',
'comma-dangle': 'off',
// Allow to place comments before the else {} block
'brace-style': 'off',
indent: 'off'
}
}
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
"debug": "^4.3.1",
"denque": "^1.5.0",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.2",
"it-pipe": "^1.1.0",
"libp2p-interfaces": "^4.0.4",
"libp2p-crypto": "^0.21.2",
"libp2p-interfaces": "4.0.4",
"multiformats": "^9.6.4",
"peer-id": "^0.16.0",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
Expand Down Expand Up @@ -75,10 +78,10 @@
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^1.0.0",
"libp2p": "0.36.1",
"libp2p-floodsub": "^0.29.0",
"libp2p-interfaces-compliance-tests": "^4.0.6",
"libp2p-mplex": "^0.10.3",
"libp2p-websockets": "^0.16.1",
"libp2p-floodsub": "^0.29.1",
"libp2p-interfaces-compliance-tests": "^4.0.8",
"libp2p-mplex": "^0.10.7",
"libp2p-websockets": "^0.16.2",
"lodash": "^4.17.15",
"multiaddr": "^10.0.0",
"os": "^0.1.1",
Expand Down
128 changes: 79 additions & 49 deletions test/2-nodes.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import chai from 'chai'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import delay from 'delay'
import Gossipsub, { multicodec } from '../ts'
import { createGossipsubs, createConnectedGossipsubs, expectSet, stopNode, first } from './utils'
import Gossipsub from '../ts'
import { createGossipsubs, createPubsubs, createConnectedGossipsubs, expectSet, stopNode, first } from './utils'
import { RPC } from '../ts/message/rpc'
import { InMessage, PeerId } from 'libp2p-interfaces/src/pubsub'
import PubsubBaseProtocol, { PeerId } from 'libp2p-interfaces/src/pubsub'
import { FloodsubID, GossipsubIDv11 } from '../ts/constants'
import { GossipsubMessage } from '../ts/types'

chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
Expand All @@ -13,6 +15,28 @@ const expect = chai.expect
const shouldNotHappen = () => expect.fail()

describe('2 nodes', () => {
describe('Pubsub dial', () => {
let nodes: PubsubBaseProtocol[]

// Create pubsub nodes
before(async () => {
nodes = await createPubsubs({ number: 2 })
})

after(() => Promise.all(nodes.map(stopNode)))

it('Dial from nodeA to nodeB happened with pubsub', async () => {
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, FloodsubID)

while (nodes[0]['peers'].size === 0 || nodes[1]['peers'].size === 0) {
await delay(10)
}

expect(nodes[0]['peers'].size).to.be.eql(1)
expect(nodes[1]['peers'].size).to.be.eql(1)
})
})

describe('basics', () => {
let nodes: Gossipsub[] = []

Expand All @@ -24,15 +48,14 @@ describe('2 nodes', () => {
after(() => Promise.all(nodes.map(stopNode)))

it('Dial from nodeA to nodeB happened with pubsub', async () => {
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, multicodec)
await delay(10)
await Promise.all([
new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)),
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, GossipsubIDv11)

expect(nodes[0].peers.size).to.be.eql(1)
expect(nodes[1].peers.size).to.be.eql(1)
while (nodes[0]['peers'].size === 0 || nodes[1]['peers'].size === 0) {
await delay(10)
}

expect(nodes[0]['peers'].size).to.be.eql(1)
expect(nodes[1]['peers'].size).to.be.eql(1)
})
})

Expand All @@ -47,7 +70,14 @@ describe('2 nodes', () => {
after(() => Promise.all(nodes.map(stopNode)))

it('Subscribe to a topic', async () => {
const topic = 'Z'
const topic = 'test_topic'

// await subscription change, after calling subscribe
const subscriptionEventPromise = Promise.all([
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
])

nodes[0].subscribe(topic)
nodes[1].subscribe(topic)

Expand All @@ -61,14 +91,14 @@ describe('2 nodes', () => {

const [changedPeerId, changedSubs] = evt0 as [PeerId, RPC.ISubOpts[]]

expectSet(nodes[0].subscriptions, [topic])
expectSet(nodes[1].subscriptions, [topic])
expect(nodes[0].peers.size).to.equal(1)
expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[0].topics.get(topic), [nodes[1].peerId.toB58String()])
expectSet(nodes[1].topics.get(topic), [nodes[0].peerId.toB58String()])
expectSet(nodes[0]['subscriptions'], [topic])
expectSet(nodes[1]['subscriptions'], [topic])
expect(nodes[0]['peers'].size).to.equal(1)
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[0]['topics'].get(topic), [nodes[1].peerId.toB58String()])
expectSet(nodes[1]['topics'].get(topic), [nodes[0].peerId.toB58String()])

expect(changedPeerId.toB58String()).to.equal(first(nodes[0].peers).id.toB58String())
expect(changedPeerId.toB58String()).to.equal(first(nodes[0]['peers']).id.toB58String())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topicID).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(true)
Expand All @@ -79,8 +109,8 @@ describe('2 nodes', () => {
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])

expect(first(nodes[0].mesh.get(topic))).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic))).to.equal(first(nodes[1].peers).id.toB58String())
expect(first(nodes[0]['mesh'].get(topic))).to.equal(first(nodes[0]['peers']).id.toB58String())
expect(first(nodes[1]['mesh'].get(topic))).to.equal(first(nodes[1]['peers']).id.toB58String())
})
})

Expand Down Expand Up @@ -109,29 +139,29 @@ describe('2 nodes', () => {
afterEach(() => Promise.all(nodes.map(stopNode)))

it('Publish to a topic - nodeA', async () => {
const promise = new Promise<InMessage>((resolve) => nodes[1].once(topic, resolve))
const promise = new Promise<GossipsubMessage>((resolve) => nodes[1].once(topic, resolve))
nodes[0].once(topic, (m) => shouldNotHappen)

nodes[0].publish(topic, uint8ArrayFromString('hey'))

const msg = await promise

expect(msg.data.toString()).to.equal('hey')
expect(msg.from).to.be.eql(nodes[0].peerId.toB58String())
expect(msg.from).to.be.eql(nodes[0].peerId.toBytes())

nodes[0].removeListener(topic, shouldNotHappen)
})

it('Publish to a topic - nodeB', async () => {
const promise = new Promise<InMessage>((resolve) => nodes[0].once(topic, resolve))
const promise = new Promise<GossipsubMessage>((resolve) => nodes[0].once(topic, resolve))
nodes[1].once(topic, shouldNotHappen)

nodes[1].publish(topic, uint8ArrayFromString('banana'))

const msg = await promise

expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(nodes[1].peerId.toB58String())
expect(msg.from).to.be.eql(nodes[1].peerId.toBytes())

nodes[1].removeListener(topic, shouldNotHappen)
})
Expand All @@ -143,11 +173,11 @@ describe('2 nodes', () => {

nodes[0].on(topic, receivedMsg)

function receivedMsg(msg: InMessage) {
expect(msg.data.toString().startsWith('banana')).to.be.true
expect(msg.from).to.be.eql(nodes[1].peerId.toB58String())
function receivedMsg(msg: RPC.IMessage) {
expect(msg.data!.toString().startsWith('banana')).to.be.true
expect(msg.from).to.be.eql(nodes[1].peerId.toBytes())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
expect(msg.topic).to.be.eql(topic)

if (++counter === 10) {
nodes[0].removeListener(topic, receivedMsg)
Expand All @@ -168,7 +198,7 @@ describe('2 nodes', () => {

// Create pubsub nodes
beforeEach(async () => {
nodes = await createConnectedGossipsubs({ number: 2 })
nodes = await createConnectedGossipsubs({ number: 2, options: {allowPublishToZeroPeers: true} })
})

// Create subscriptions
Expand All @@ -188,16 +218,16 @@ describe('2 nodes', () => {

it('Unsubscribe from a topic', async () => {
nodes[0].unsubscribe(topic)
expect(nodes[0].subscriptions.size).to.equal(0)
expect(nodes[0]['subscriptions'].size).to.equal(0)

const [changedPeerId, changedSubs] = await new Promise<[PeerId, RPC.ISubOpts[]]>((resolve) => {
nodes[1].once('pubsub:subscription-change', (...args: [PeerId, RPC.ISubOpts[]]) => resolve(args))
})
await new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))

expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[1].topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodes[1].peers).id.toB58String())
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[1]['topics'].get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodes[1]['peers']).id.toB58String())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topicID).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(false)
Expand Down Expand Up @@ -245,10 +275,10 @@ describe('2 nodes', () => {
nodes[0].subscribe('Za')
nodes[1].subscribe('Zb')

expect(nodes[0].peers.size).to.equal(0)
expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(0)
expectSet(nodes[1].subscriptions, ['Zb'])
expect(nodes[0]['peers'].size).to.equal(0)
expectSet(nodes[0]['subscriptions'], ['Za'])
expect(nodes[1]['peers'].size).to.equal(0)
expectSet(nodes[1]['subscriptions'], ['Zb'])
})

after(() => Promise.all(nodes.map(stopNode)))
Expand All @@ -257,20 +287,20 @@ describe('2 nodes', () => {
this.timeout(5000)

await Promise.all([
nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, multicodec),
nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, GossipsubIDv11),
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve))
])
expect(nodes[0].peers.size).to.equal(1)
expect(nodes[1].peers.size).to.equal(1)
expect(nodes[0]['peers'].size).to.equal(1)
expect(nodes[1]['peers'].size).to.equal(1)

expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[1].topics.get('Za'), [nodes[0].peerId.toB58String()])
expectSet(nodes[0]['subscriptions'], ['Za'])
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[1]['topics'].get('Za'), [nodes[0].peerId.toB58String()])

expectSet(nodes[1].subscriptions, ['Zb'])
expect(nodes[0].peers.size).to.equal(1)
expectSet(nodes[0].topics.get('Zb'), [nodes[1].peerId.toB58String()])
expectSet(nodes[1]['subscriptions'], ['Zb'])
expect(nodes[0]['peers'].size).to.equal(1)
expectSet(nodes[0]['topics'].get('Zb'), [nodes[1].peerId.toB58String()])
})
})

Expand All @@ -284,8 +314,8 @@ describe('2 nodes', () => {

it("nodes don't have peers after stopped", async () => {
await Promise.all(nodes.map(stopNode))
expect(nodes[0].peers.size).to.equal(0)
expect(nodes[1].peers.size).to.equal(0)
expect(nodes[0]['peers'].size).to.equal(0)
expect(nodes[1]['peers'].size).to.equal(0)
})
})
})
Loading

1 comment on commit baf84f8

@AtHeartEngineer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big one, took a bit to merge into my fork, but I like the changes :)

Please sign in to comment.