diff --git a/src/index.ts b/src/index.ts index c34fc585..c540dc7f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -190,7 +190,7 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit { decodeRpcLimits?: DecodeRPCLimits /** - * If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to false + * If true, will utilize the libp2p connection manager tagging system to prune/graft connections to peers, defaults to true */ tagMeshPeers: boolean } @@ -420,7 +420,7 @@ export class GossipSub extends TypedEventEmitter implements Pub fallbackToFloodsub: true, floodPublish: true, batchPublish: false, - tagMeshPeers: false, + tagMeshPeers: true, doPX: false, directPeers: [], D: constants.GossipsubD, @@ -1639,9 +1639,9 @@ export class GossipSub extends TypedEventEmitter implements Pub score, topicID ) - continue + } else { + await this.pxConnect(peers) } - await this.pxConnect(peers) } this.safeDispatchEvent('gossipsub:prune', { detail: { peerId: id, topic: topicID, direction: 'inbound' } }) @@ -2283,8 +2283,6 @@ export class GossipSub extends TypedEventEmitter implements Pub ] this.sendRpc(id, { control: { graft } }) - - this.safeDispatchEvent('gossipsub:graft', { detail: { peerId: id, topic, direction: 'outbound' } }) } /** @@ -2296,8 +2294,6 @@ export class GossipSub extends TypedEventEmitter implements Pub const prune = [await this.makePrune(id, topic, this.opts.doPX, onUnsubscribe)] this.sendRpc(id, { control: { prune } }) - - this.safeDispatchEvent('gossipsub:prune', { detail: { peerId: id, topic, direction: 'outbound' } }) } /** @@ -2343,6 +2339,21 @@ export class GossipSub extends TypedEventEmitter implements Pub this.metrics?.onRpcSent(rpc, rpcBytes.length) + if (rpc.control?.graft != null) { + for (const topic of rpc.control?.graft) { + if (topic.topicID != null) { + this.safeDispatchEvent('gossipsub:graft', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } }) + } + } + } + if (rpc.control?.prune != null) { + for (const topic of rpc.control?.prune) { + if (topic.topicID != null) { + this.safeDispatchEvent('gossipsub:prune', { detail: { peerId: id, topic: topic.topicID, direction: 'outbound' } }) + } + } + } + return true } diff --git a/test/gossip.spec.ts b/test/gossip.spec.ts index 9736fe4d..11a09aa0 100644 --- a/test/gossip.spec.ts +++ b/test/gossip.spec.ts @@ -27,8 +27,7 @@ describe('gossip', () => { IPColocationFactorThreshold: GossipsubDhi + 3 }, maxInboundDataLength: 4000000, - allowPublishToZeroPeers: false, - tagMeshPeers: true + allowPublishToZeroPeers: false } }) }) @@ -100,7 +99,6 @@ describe('gossip', () => { expect(publishResult.recipients).to.deep.equal([]) }) - // flakey test it('should tag peers', async function () { this.timeout(10e4) const nodeA = nodes[0] @@ -109,32 +107,24 @@ describe('gossip', () => { const twoNodes = [nodeA, nodeB] - const subscriptionPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'subscription-change')) + const graftPromises = twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:graft')) + // add subscriptions to each node twoNodes.forEach((n) => { n.pubsub.subscribe(topic) }) // every node connected to every other await connectAllPubSubNodes(twoNodes) - // await for subscriptions to be transmitted - await Promise.all(subscriptionPromises) + // await grafts + await Promise.all(graftPromises) // await mesh rebalancing await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) - let peerInfo - try { - peerInfo = await nodeA.components.peerStore.get(nodeB.components.peerId) - } catch (err: any) { - // if it's not in node A's peerstore, then try node B's - if (err.code === 'ERR_NOT_FOUND') { - peerInfo = await nodeB.components.peerStore.get(nodeA.components.peerId) - } - } - if (peerInfo == null) { - throw new Error('Peer info not found') - } - expect(peerInfo.tags.get(topic)?.value).to.equal(100) + const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined) + const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined) + expect(peerInfoA?.tags.get(topic)?.value).to.equal(100) + expect(peerInfoB?.tags.get(topic)?.value).to.equal(100) }) it('should remove the tags upon pruning', async function () { @@ -158,11 +148,16 @@ describe('gossip', () => { // await mesh rebalancing await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) - nodeA.pubsub.unsubscribe(topic) + twoNodes.forEach((n) => { n.pubsub.unsubscribe(topic) }) - // await for subscriptions to be transmitted - await Promise.all(subscriptionPromises) - expect((await nodeA.components.peerStore.get(nodeB.components.peerId)).tags.get(topic)).to.be.undefined() + // await for unsubscriptions to be transmitted + // await mesh rebalancing + await Promise.all(twoNodes.map(async (n) => pEvent(n.pubsub, 'gossipsub:heartbeat'))) + + const peerInfoA = await nodeA.components.peerStore.get(nodeB.components.peerId).catch((e) => undefined) + const peerInfoB = await nodeB.components.peerStore.get(nodeA.components.peerId).catch((e) => undefined) + expect(peerInfoA?.tags.get(topic)).to.be.undefined() + expect(peerInfoB?.tags.get(topic)).to.be.undefined() }) it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {