Skip to content

Commit

Permalink
chore: pr review
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Feb 7, 2024
1 parent ea9d629 commit 525a91f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
27 changes: 19 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
decodeRpcLimits?: DecodeRPCLimits

/**
* If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to false
* If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true
*/
tagMeshPeers: boolean
}
Expand Down Expand Up @@ -420,7 +420,7 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
fallbackToFloodsub: true,
floodPublish: true,
batchPublish: false,
tagMeshPeers: false,
tagMeshPeers: true,
doPX: false,
directPeers: [],
D: constants.GossipsubD,
Expand Down Expand Up @@ -1639,9 +1639,9 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
score,
topicID
)
continue
} else {
await this.pxConnect(peers)
}
await this.pxConnect(peers)
}

this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topicID, direction: 'inbound' } })
Expand Down Expand Up @@ -2283,8 +2283,6 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
]

this.sendRpc(id, { control: { graft } })

this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic, direction: 'outbound' } })
}

/**
Expand All @@ -2296,8 +2294,6 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub
const prune = [await this.makePrune(id, topic, this.opts.doPX, onUnsubscribe)]

this.sendRpc(id, { control: { prune } })

this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic, direction: 'outbound' } })
}

/**
Expand Down Expand Up @@ -2343,6 +2339,21 @@ export class GossipSub extends TypedEventEmitter<GossipsubEvents> implements Pub

this.metrics?.onRpcSent(rpc, rpcBytes.length)

if (rpc.control?.graft != null) {
for (const topic of rpc.control?.graft) {
if (topic.topicID != null) {
this.safeDispatchEvent<MeshPeer>('gossipsub:graft', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
}
}
}
if (rpc.control?.prune != null) {
for (const topic of rpc.control?.prune) {
if (topic.topicID != null) {
this.safeDispatchEvent<MeshPeer>('gossipsub:prune', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } })
}
}
}

return true
}

Expand Down
41 changes: 18 additions & 23 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ describe('gossip', () => {
IPColocationFactorThreshold: GossipsubDhi + 3
},
maxInboundDataLength: 4000000,
allowPublishToZeroPeers: false,
tagMeshPeers: true
allowPublishToZeroPeers: false
}
})
})
Expand Down Expand Up @@ -100,7 +99,6 @@ describe('gossip', () => {
expect(publishResult.recipients).to.deep.equal([])
})

// flakey test
it('should tag peers', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
Expand All @@ -109,32 +107,24 @@ describe('gossip', () => {

const twoNodes = [nodeA, nodeB]

const subscriptionPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'subscription-change'))
const graftPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:graft'))

// add subscriptions to each node
twoNodes.forEach((n) => { n.pubsub.subscribe(topic) })

// every node connected to every other
await connectAllPubSubNodes(twoNodes)

// await for subscriptions to be transmitted
await Promise.all(subscriptionPromises)
// await grafts
await Promise.all(graftPromises)

// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

let peerInfo
try {
peerInfo = await nodeA.components.peerStore.get(nodeB.components.peerId)
} catch (err: any) {
// if it's not in node A's peerstore, then try node B's
if (err.code === 'ERR_NOT_FOUND') {
peerInfo = await nodeB.components.peerStore.get(nodeA.components.peerId)
}
}
if (peerInfo == null) {
throw new Error('Peer info not found')
}
expect(peerInfo.tags.get(topic)?.value).to.equal(100)
const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
expect(peerInfoA?.tags.get(topic)?.value).to.equal(100)
expect(peerInfoB?.tags.get(topic)?.value).to.equal(100)
})

it('should remove the tags upon pruning', async function () {
Expand All @@ -158,11 +148,16 @@ describe('gossip', () => {
// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

nodeA.pubsub.unsubscribe(topic)
twoNodes.forEach((n) => { n.pubsub.unsubscribe(topic) })

// await for subscriptions to be transmitted
await Promise.all(subscriptionPromises)
expect((await nodeA.components.peerStore.get(nodeB.components.peerId)).tags.get(topic)).to.be.undefined()
// await for unsubscriptions to be transmitted
// await mesh rebalancing
await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat')))

const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined)
const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined)
expect(peerInfoA?.tags.get(topic)).to.be.undefined()
expect(peerInfoB?.tags.get(topic)).to.be.undefined()
})

it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
Expand Down

0 comments on commit 525a91f

Please sign in to comment.