From c8be43e5acd6a74cfdd01857343af6f6d8210d5d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 23 Feb 2023 11:31:00 +0000 Subject: [PATCH] feat: add get subscribers for pubsub topics (#184) With this implemented we can remove some of the arbitrary delays in the interop tests, instead we can wait until we see the remote node in the topic peer list. --- packages/libp2p-daemon-client/src/index.ts | 1 + packages/libp2p-daemon-client/src/pubsub.ts | 36 +++++++++++++++++++ .../libp2p-daemon-client/test/pubsub.spec.ts | 30 ++++++++++++++++ packages/libp2p-daemon-server/src/index.ts | 7 ++++ packages/libp2p-daemon-server/src/pubsub.ts | 14 ++++++++ 5 files changed, 88 insertions(+) diff --git a/packages/libp2p-daemon-client/src/index.ts b/packages/libp2p-daemon-client/src/index.ts index 79298df3..ef56b204 100644 --- a/packages/libp2p-daemon-client/src/index.ts +++ b/packages/libp2p-daemon-client/src/index.ts @@ -295,6 +295,7 @@ export interface PubSubClient { publish: (topic: string, data: Uint8Array) => Promise subscribe: (topic: string) => AsyncIterable getTopics: () => Promise + getSubscribers: (topic: string) => Promise } export interface DaemonClient { diff --git a/packages/libp2p-daemon-client/src/pubsub.ts b/packages/libp2p-daemon-client/src/pubsub.ts index 69ff71b2..2133741e 100644 --- a/packages/libp2p-daemon-client/src/pubsub.ts +++ b/packages/libp2p-daemon-client/src/pubsub.ts @@ -6,6 +6,8 @@ import { PSMessage } from '@libp2p/daemon-protocol' import type { DaemonClient } from './index.js' +import type { PeerId } from '@libp2p/interface-peer-id' +import { peerIdFromBytes } from '@libp2p/peer-id' export class Pubsub { private readonly client: DaemonClient @@ -123,4 +125,38 @@ export class Pubsub { yield PSMessage.decode(message) } } + + async getSubscribers (topic: string): Promise { + if (typeof topic !== 'string') { + throw new CodeError('invalid topic received', 'ERR_INVALID_TOPIC') + } + + const sh = await this.client.send({ + type: Request.Type.PUBSUB, + pubsub: { + type: PSRequest.Type.LIST_PEERS, + topic + } + }) + + const message = await sh.read() + + if (message == null) { + throw new CodeError('Empty response from remote', 'ERR_EMPTY_RESPONSE') + } + + const response = Response.decode(message) + + await sh.close() + + if (response.type !== Response.Type.OK) { + throw new CodeError(response.error?.msg ?? 'Pubsub get subscribers failed', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED') + } + + if (response.pubsub == null || response.pubsub.topics == null) { + throw new CodeError('Invalid response', 'ERR_PUBSUB_GET_SUBSCRIBERS_FAILED') + } + + return response.pubsub.peerIDs.map(buf => peerIdFromBytes(buf)) + } } diff --git a/packages/libp2p-daemon-client/test/pubsub.spec.ts b/packages/libp2p-daemon-client/test/pubsub.spec.ts index 07cb1087..b21e6af9 100644 --- a/packages/libp2p-daemon-client/test/pubsub.spec.ts +++ b/packages/libp2p-daemon-client/test/pubsub.spec.ts @@ -8,6 +8,7 @@ import { createClient, DaemonClient } from '../src/index.js' import { multiaddr } from '@multiformats/multiaddr' import { StubbedInstance, stubInterface } from 'sinon-ts' import type { PubSub } from '@libp2p/interface-pubsub' +import { peerIdFromString } from '@libp2p/peer-id' const defaultMultiaddr = multiaddr('/ip4/0.0.0.0/tcp/12345') @@ -88,4 +89,33 @@ describe('daemon pubsub client', function () { await expect(client.pubsub.publish(topic, data)).to.eventually.be.rejectedWith(/Urk!/) }) }) + + describe('getSubscribers', () => { + it('should get empty list of topics when no subscriptions exist', async () => { + pubsub.getSubscribers.returns([]) + + const topic = 'test-topic' + const topics = await client.pubsub.getSubscribers(topic) + + expect(topics).to.have.lengthOf(0) + }) + + it('should get a list with a peer when subscribed', async () => { + const topic = 'test-topic' + const peer = peerIdFromString('12D3KooWKnQbfH5t1XxJW5FBoMGNjmC9LTSbDdRJxtYj2bJV5XfP') + pubsub.getSubscribers.withArgs(topic).returns([peer]) + + const peers = await client.pubsub.getSubscribers(topic) + + expect(peers).to.have.lengthOf(1) + expect(peers[0].toString()).to.equal(peer.toString()) + }) + + it('should error if receive an error message', async () => { + const topic = 'test-topic' + pubsub.getSubscribers.throws(new Error('Urk!')) + + await expect(client.pubsub.getSubscribers(topic)).to.eventually.be.rejectedWith(/Urk!/) + }) + }) }) diff --git a/packages/libp2p-daemon-server/src/index.ts b/packages/libp2p-daemon-server/src/index.ts index 772e953f..c9810b25 100644 --- a/packages/libp2p-daemon-server/src/index.ts +++ b/packages/libp2p-daemon-server/src/index.ts @@ -297,6 +297,13 @@ export class Server implements Libp2pServer { yield * this.pubsubOperations.publish(request.topic, request.data) return + case PSRequest.Type.LIST_PEERS: + if (request.topic == null) { + throw new Error('Invalid request') + } + + yield * this.pubsubOperations.listPeers(request.topic) + return default: throw new Error('ERR_INVALID_REQUEST_TYPE') } diff --git a/packages/libp2p-daemon-server/src/pubsub.ts b/packages/libp2p-daemon-server/src/pubsub.ts index d9e53423..6c24ea07 100644 --- a/packages/libp2p-daemon-server/src/pubsub.ts +++ b/packages/libp2p-daemon-server/src/pubsub.ts @@ -84,4 +84,18 @@ export class PubSubOperations { yield ErrorResponse(err) } } + + async * listPeers (topic: string): AsyncGenerator { + try { + yield OkResponse({ + pubsub: { + topics: [topic], + peerIDs: this.pubsub.getSubscribers(topic).map(peer => peer.toBytes()) + } + }) + } catch (err: any) { + log.error(err) + yield ErrorResponse(err) + } + } }