diff --git a/package.json b/package.json index 63722f9aef..ae6bda023d 100644 --- a/package.json +++ b/package.json @@ -163,7 +163,7 @@ "scripts": { "clean": "aegir clean", "lint": "aegir lint", - "dep-check": "aegir dep-check", + "dep-check": "aegir dep-check -i protons", "build": "aegir build", "generate": "protons test/message/rpc.proto", "test": "aegir test", @@ -187,7 +187,6 @@ "@libp2p/peer-collections": "^3.0.0", "@libp2p/peer-id": "^2.0.0", "@libp2p/topology": "^4.0.0", - "@multiformats/multiaddr": "^11.0.0", "abortable-iterator": "^4.0.2", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.3", @@ -199,14 +198,13 @@ }, "devDependencies": { "@libp2p/peer-id-factory": "^2.0.0", - "aegir": "^37.9.1", + "aegir": "^38.1.6", "delay": "^5.0.0", "it-pair": "^2.0.2", "p-defer": "^4.0.0", "p-wait-for": "^5.0.0", "protons": "^7.0.2", "protons-runtime": "^5.0.0", - "sinon": "^15.0.1", - "util": "^0.12.4" + "sinon": "^15.0.1" } } diff --git a/src/index.ts b/src/index.ts index b0648a0aa8..fd32a6b243 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,7 +29,7 @@ export interface PubSubComponents { * PubSubBaseProtocol handles the peers and connections logic for pubsub routers * and specifies the API that pubsub routers should have. */ -export abstract class PubSubBaseProtocol extends EventEmitter implements PubSub { +export abstract class PubSubBaseProtocol = PubSubEvents> extends EventEmitter implements PubSub { public started: boolean /** * Map of topics to which peers are subscribed to @@ -108,10 +108,8 @@ export abstract class PubSubBaseProtocol { if (this.started || !this.enabled) { return } @@ -121,10 +119,12 @@ export abstract class PubSubBaseProtocol await registrar.handle(multicodec, this._onIncomingStream, { - maxInboundStreams: this.maxInboundStreams, - maxOutboundStreams: this.maxOutboundStreams - }))) + await Promise.all(this.multicodecs.map(async multicodec => { + await registrar.handle(multicodec, this._onIncomingStream, { + maxInboundStreams: this.maxInboundStreams, + maxOutboundStreams: this.maxOutboundStreams + }) + })) // register protocol with topology // Topology callbacks called on connection manager changes @@ -141,7 +141,7 @@ export abstract class PubSubBaseProtocol { if (!this.started || !this.enabled) { return } @@ -150,10 +150,14 @@ export abstract class PubSubBaseProtocol registrar.unregister(id)) + this._registrarTopologyIds?.forEach(id => { + registrar.unregister(id) + }) } - await Promise.all(this.multicodecs.map(async multicodec => await registrar.unhandle(multicodec))) + await Promise.all(this.multicodecs.map(async multicodec => { + await registrar.unhandle(multicodec) + })) log('stopping') for (const peerStreams of this.peers.values()) { @@ -166,14 +170,14 @@ export abstract class PubSubBaseProtocol log(err)) + .catch(err => { log(err) }) } /** * Registrar notifies an established connection with pubsub protocol */ - protected _onPeerConnected (peerId: PeerId, conn: Connection) { + protected _onPeerConnected (peerId: PeerId, conn: Connection): void { log('connected %p', peerId) void Promise.resolve().then(async () => { @@ -221,7 +225,7 @@ export abstract class PubSubBaseProtocol, peerStreams: PeerStreams) { + async processMessages (peerId: PeerId, stream: AsyncIterable, peerStreams: PeerStreams): Promise { try { await pipe( stream, @@ -320,7 +324,7 @@ export abstract class PubSubBaseProtocol log(err)) + .catch(err => { log(err) }) } } ) @@ -378,7 +382,7 @@ export abstract class PubSubBaseProtocol log(err)) + .catch(err => { log(err) }) } return true @@ -387,7 +391,7 @@ export abstract class PubSubBaseProtocol { if (this.components.peerId.equals(from) && !this.emitSelf) { return } @@ -442,7 +446,7 @@ export abstract class PubSubBaseProtocol | Uint8Array { const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case 'StrictSign': @@ -470,7 +474,7 @@ export abstract class PubSubBaseProtocol ({ topic: str, subscribe: Boolean(subscribe) })), messages: (messages ?? []).map(toRpcMessage) }) @@ -507,7 +511,7 @@ export abstract class PubSubBaseProtocol { // eslint-disable-line require-await const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case 'StrictNoSign': @@ -671,7 +675,7 @@ export abstract class PubSubBaseProtocol { /** * Do we have a connection to read from? */ - get isReadable () { + get isReadable (): boolean { return Boolean(this.inboundStream) } /** * Do we have a connection to write on? */ - get isWritable () { + get isWritable (): boolean { return Boolean(this.outboundStream) } @@ -73,7 +73,7 @@ export class PeerStreams extends EventEmitter { * Send a message to this peer. * Throws if there is no `stream` to write to available. */ - write (data: Uint8Array | Uint8ArrayList) { + write (data: Uint8Array | Uint8ArrayList): void { if (this.outboundStream == null) { const id = this.id.toString() throw new Error('No writable connection to ' + id) @@ -85,7 +85,7 @@ export class PeerStreams extends EventEmitter { /** * Attach a raw inbound stream and setup a read stream */ - attachInboundStream (stream: Stream) { + attachInboundStream (stream: Stream): AsyncIterable { // Create and attach a new inbound stream // The inbound stream is: // - abortable, set to only return on abort, rather than throw @@ -107,12 +107,12 @@ export class PeerStreams extends EventEmitter { /** * Attach a raw outbound stream and setup a write stream */ - async attachOutboundStream (stream: Stream) { + async attachOutboundStream (stream: Stream): Promise> { // If an outbound stream already exists, gently close it const _prevStream = this.outboundStream if (this.outboundStream != null) { // End the stream without emitting a close event - await this.outboundStream.end() + this.outboundStream.end() } this._rawOutboundStream = stream @@ -151,7 +151,7 @@ export class PeerStreams extends EventEmitter { /** * Closes the open connection to peer */ - close () { + close (): void { if (this.closed) { return } diff --git a/src/sign.ts b/src/sign.ts index 8a30ef3ad1..fd61237dc6 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -45,7 +45,7 @@ export async function signMessage (peerId: PeerId, message: { from: PeerId, topi /** * Verifies the signature of the given message */ -export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8Array) { +export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8Array): Promise { if (message.type !== 'signed') { throw new Error('Message type must be "signed" to be verified') } @@ -80,7 +80,7 @@ export async function verifySignature (message: SignedMessage, encode: (rpc: Pub * Returns the PublicKey associated with the given message. * If no valid PublicKey can be retrieved an error will be returned. */ -export async function messagePublicKey (message: SignedMessage) { +export async function messagePublicKey (message: SignedMessage): Promise { if (message.type !== 'signed') { throw new Error('Message type must be "signed" to have a public key') } diff --git a/src/utils.ts b/src/utils.ts index 8ed27999fb..69dd7b8e35 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -17,7 +17,7 @@ export function randomSeqno (): bigint { /** * Generate a message id, based on the `key` and `seqno` */ -export const msgId = (key: Uint8Array, seqno: bigint) => { +export const msgId = (key: Uint8Array, seqno: bigint): Uint8Array => { const seqnoBytes = uint8ArrayFromString(seqno.toString(16).padStart(16, '0'), 'base16') const msgId = new Uint8Array(key.length + seqnoBytes.length) @@ -30,7 +30,7 @@ export const msgId = (key: Uint8Array, seqno: bigint) => { /** * Generate a message id, based on message `data` */ -export const noSignMsgId = (data: Uint8Array) => { +export const noSignMsgId = (data: Uint8Array): Uint8Array | Promise => { return sha256.encode(data) } @@ -38,7 +38,7 @@ export const noSignMsgId = (data: Uint8Array) => { * Check if any member of the first set is also a member * of the second set */ -export const anyMatch = (a: Set | number[], b: Set | number[]) => { +export const anyMatch = (a: Set | number[], b: Set | number[]): boolean => { let bHas if (Array.isArray(b)) { bHas = (val: number) => b.includes(val) @@ -58,7 +58,7 @@ export const anyMatch = (a: Set | number[], b: Set | number[]) = /** * Make everything an array */ -export const ensureArray = function (maybeArray: T | T[]) { +export const ensureArray = function (maybeArray: T | T[]): T[] { if (!Array.isArray(maybeArray)) { return [maybeArray] } diff --git a/test/emit-self.spec.ts b/test/emit-self.spec.ts index 76609da805..b6b0dc2cee 100644 --- a/test/emit-self.spec.ts +++ b/test/emit-self.spec.ts @@ -10,7 +10,7 @@ import delay from 'delay' const protocol = '/pubsub/1.0.0' const topic = 'foo' const data = uint8ArrayFromString('bar') -const shouldNotHappen = () => expect.fail() +const shouldNotHappen = (): void => expect.fail() describe('emitSelf', () => { let pubsub: PubsubImplementation @@ -50,7 +50,7 @@ describe('emitSelf', () => { await pubsub.publish(topic, data) - return await promise + await promise }) it('should publish a message without data', async () => { @@ -66,7 +66,7 @@ describe('emitSelf', () => { await pubsub.publish(topic) - return await promise + await promise }) }) diff --git a/test/lifecycle.spec.ts b/test/lifecycle.spec.ts index d4b94a8744..95b070f935 100644 --- a/test/lifecycle.spec.ts +++ b/test/lifecycle.spec.ts @@ -12,6 +12,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Registrar } from '@libp2p/interface-registrar' import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' import type { Uint8ArrayList } from 'uint8arraylist' +import delay from 'delay' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { @@ -51,7 +52,7 @@ describe('pubsub base lifecycle', () => { } pubsub = new PubsubProtocol({ - peerId: peerId, + peerId, registrar: sinonMockRegistrar }, { multicodecs: ['/pubsub/1.0.0'] @@ -154,8 +155,8 @@ describe('pubsub base lifecycle', () => { const [c0, c1] = ConnectionPair() // Notify peers of connection - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(pubsubA.peers.size).to.be.eql(1) expect(pubsubB.peers.size).to.be.eql(1) @@ -175,8 +176,8 @@ describe('pubsub base lifecycle', () => { sinon.spy(c0, 'newStream') - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(c0.newStream).to.have.property('callCount', 1) // @ts-expect-error _removePeer is a protected method @@ -184,7 +185,9 @@ describe('pubsub base lifecycle', () => { sinon.spy(c2, 'newStream') - await topologyA?.onConnect(peerIdB, c2) + topologyA?.onConnect(peerIdB, c2) + // newStream invocation takes place in a resolved promise + await delay(10) expect(c2.newStream).to.have.property('callCount', 1) // @ts-expect-error _removePeer is a protected method @@ -215,8 +218,8 @@ describe('pubsub base lifecycle', () => { const error = new Error('new stream error') sinon.stub(c0, 'newStream').throws(error) - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) expect(c0.newStream).to.have.property('callCount', 1) }) @@ -233,8 +236,8 @@ describe('pubsub base lifecycle', () => { // Notify peers of connection const [c0, c1] = ConnectionPair() - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) // Notice peers of disconnect topologyA?.onDisconnect(peerIdB) diff --git a/test/message.spec.ts b/test/message.spec.ts index 6ae2eda114..aca26cccea 100644 --- a/test/message.spec.ts +++ b/test/message.spec.ts @@ -18,7 +18,7 @@ describe('pubsub base messages', () => { before(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: ['/pubsub/1.0.0'] diff --git a/test/pubsub.spec.ts b/test/pubsub.spec.ts index c3a32a3a94..0962abce0a 100644 --- a/test/pubsub.spec.ts +++ b/test/pubsub.spec.ts @@ -30,7 +30,7 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol], @@ -38,7 +38,7 @@ describe('pubsub base implementation', () => { }) }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('calls _publish for router to forward messages', async () => { sinon.spy(pubsub, 'publishMessage') @@ -103,7 +103,7 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol] @@ -111,7 +111,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('should add subscription', () => { pubsub.subscribe(topic) @@ -164,8 +164,8 @@ describe('pubsub base implementation', () => { // Notify peers of connection const [c0, c1] = ConnectionPair() - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) afterEach(async () => { @@ -205,7 +205,7 @@ describe('pubsub base implementation', () => { beforeEach(async () => { const peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol] @@ -213,7 +213,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('should remove all subscriptions for a topic', () => { pubsub.subscribe(topic) @@ -271,8 +271,8 @@ describe('pubsub base implementation', () => { // Notify peers of connection const [c0, c1] = ConnectionPair() - await topologyA.onConnect(peerIdB, c0) - await handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) + topologyA.onConnect(peerIdB, c0) + handlerB.handler(await mockIncomingStreamEvent(protocol, c1, peerIdA)) }) afterEach(async () => { @@ -335,7 +335,7 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol] @@ -343,7 +343,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('returns the subscribed topics', () => { let subsTopics = pubsub.getTopics() @@ -364,14 +364,14 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol] }) }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('should fail if pubsub is not started', () => { const topic = 'test-topic' @@ -435,7 +435,7 @@ describe('pubsub base implementation', () => { beforeEach(async () => { peerId = await createPeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol] @@ -443,7 +443,7 @@ describe('pubsub base implementation', () => { await pubsub.start() }) - afterEach(async () => await pubsub.stop()) + afterEach(async () => { await pubsub.stop() }) it('should drop unsigned messages', async () => { const publishSpy = sinon.spy(pubsub, 'publishMessage') @@ -459,7 +459,7 @@ describe('pubsub base implementation', () => { from: peerStream.id.toBytes(), data, sequenceNumber: await noSignMsgId(data), - topic: topic + topic }] } diff --git a/test/topic-validators.spec.ts b/test/topic-validators.spec.ts index 35cd9e6aee..dfd672303e 100644 --- a/test/topic-validators.spec.ts +++ b/test/topic-validators.spec.ts @@ -24,7 +24,7 @@ describe('topic validators', () => { otherPeerId = await createEd25519PeerId() pubsub = new PubsubImplementation({ - peerId: peerId, + peerId, registrar: new MockRegistrar() }, { multicodecs: [protocol], diff --git a/test/utils/index.ts b/test/utils/index.ts index abeb01b4ac..f21d51f53d 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -5,7 +5,7 @@ import { RPC } from '../message/rpc.js' import type { IncomingStreamData, Registrar, StreamHandler, StreamHandlerRecord, Topology } from '@libp2p/interface-registrar' import type { Connection } from '@libp2p/interface-connection' import type { PeerId } from '@libp2p/interface-peer-id' -import type { PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' export const createPeerId = async (): Promise => { const peerId = await PeerIdFactory.createEd25519PeerId() @@ -14,7 +14,7 @@ export const createPeerId = async (): Promise => { } export class PubsubImplementation extends PubSubBaseProtocol { - async publishMessage () { + async publishMessage (): Promise { return { recipients: [] } @@ -41,7 +41,7 @@ export class MockRegistrar implements Registrar { private readonly topologies: Map = new Map() private readonly handlers: Map = new Map() - getProtocols () { + getProtocols (): string[] { const protocols = new Set() for (const topology of this.topologies.values()) { @@ -67,7 +67,7 @@ export class MockRegistrar implements Registrar { } } - async unhandle (protocols: string | string[]) { + async unhandle (protocols: string | string[]): Promise { const protocolList = Array.isArray(protocols) ? protocols : [protocols] protocolList.forEach(protocol => { @@ -85,7 +85,7 @@ export class MockRegistrar implements Registrar { return { handler, options: {} } } - async register (protocols: string | string[], topology: Topology) { + async register (protocols: string | string[], topology: Topology): Promise { if (!Array.isArray(protocols)) { protocols = [protocols] } @@ -100,7 +100,7 @@ export class MockRegistrar implements Registrar { return id } - unregister (id: string | string[]) { + unregister (id: string | string[]): void { if (!Array.isArray(id)) { id = [id] } @@ -108,7 +108,7 @@ export class MockRegistrar implements Registrar { id.forEach(id => this.topologies.delete(id)) } - getTopologies (protocol: string) { + getTopologies (protocol: string): Topology[] { const output: Topology[] = [] for (const { topology, protocols } of this.topologies.values()) {