From bcf7c356cb943e105c1d924ad3ed8c2d32100e41 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Mon, 1 Aug 2022 09:18:37 +0100 Subject: [PATCH] deps: update it-length-prefixed and uint8arraylists deps Update deps to support no-copy operations --- package.json | 11 +++++----- src/index.ts | 46 +++++++++++++++++++++++++++++---------- src/peer-streams.ts | 12 ++++++----- src/sign.ts | 49 ++++++++++++++++++++++++++---------------- src/utils.ts | 32 ++++++++++++++++++++------- test/instance.spec.ts | 7 +++--- test/lifecycle.spec.ts | 7 +++--- test/message.spec.ts | 26 ++++++++++++++++------ test/message/rpc.ts | 37 +++++++++++++++---------------- test/pubsub.spec.ts | 10 ++++----- test/sign.spec.ts | 24 ++++++++++++++------- test/utils.spec.ts | 24 ++++++++++++++------- test/utils/index.ts | 5 +++-- 13 files changed, 188 insertions(+), 102 deletions(-) diff --git a/package.json b/package.json index d863738..aafe2fa 100644 --- a/package.json +++ b/package.json @@ -180,7 +180,7 @@ "@libp2p/crypto": "^1.0.0", "@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-peer-id": "^1.0.2", - "@libp2p/interface-pubsub": "^1.0.3", + "@libp2p/interface-pubsub": "^2.0.0", "@libp2p/interface-registrar": "^2.0.0", "@libp2p/interfaces": "^3.0.2", "@libp2p/logger": "^2.0.0", @@ -191,11 +191,12 @@ "abortable-iterator": "^4.0.2", "err-code": "^3.0.1", "iso-random-stream": "^2.0.0", - "it-length-prefixed": "^7.0.1", + "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.3", "it-pushable": "^3.0.0", "multiformats": "^9.6.3", "p-queue": "^7.2.0", + "uint8arraylist": "^2.0.0", "uint8arrays": "^3.0.0" }, "devDependencies": { @@ -204,9 +205,9 @@ "delay": "^5.0.0", "it-pair": "^2.0.2", "p-defer": "^4.0.0", - "p-wait-for": "^4.1.0", - "protons": "^3.0.4", - "protons-runtime": "^1.0.4", + "p-wait-for": "^5.0.0", + "protons": "^4.0.1", + "protons-runtime": "^2.0.2", "sinon": "^14.0.0", "util": "^0.12.4" } diff --git a/src/index.ts b/src/index.ts index 449581d..5a66c1e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,7 +6,7 @@ import Queue from 'p-queue' import { createTopology } from '@libp2p/topology' import { codes } from './errors.js' import { PeerStreams as PeerStreamsImpl } from './peer-streams.js' -import { toMessage, ensureArray, randomSeqno, noSignMsgId, msgId, toRpcMessage } from './utils.js' +import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js' import { signMessage, verifySignature @@ -17,6 +17,7 @@ import type { Connection } from '@libp2p/interface-connection' import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub' import { PeerMap, PeerSet } from '@libp2p/peer-collections' import { Components, Initializable } from '@libp2p/components' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:pubsub') @@ -284,7 +285,7 @@ export abstract class PubSubBaseProtocol extends EventEmi /** * Responsible for processing each RPC message received by other peers. */ - async processMessages (peerId: PeerId, stream: AsyncIterable, peerStreams: PeerStreams) { + async processMessages (peerId: PeerId, stream: AsyncIterable, peerStreams: PeerStreams) { try { await pipe( stream, @@ -446,6 +447,10 @@ export abstract class PubSubBaseProtocol extends EventEmi const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case 'StrictSign': + if (msg.type !== 'signed') { + throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE) + } + if (msg.sequenceNumber == null) { throw errcode(new Error('Need seqno when signature policy is StrictSign but it was missing'), codes.ERR_MISSING_SEQNO) } @@ -474,19 +479,19 @@ export abstract class PubSubBaseProtocol extends EventEmi * Decode Uint8Array into an RPC object. * This can be override to use a custom router protobuf. */ - abstract decodeRpc (bytes: Uint8Array): PubSubRPC + abstract decodeRpc (bytes: Uint8Array | Uint8ArrayList): PubSubRPC /** * Encode RPC object into a Uint8Array. * This can be override to use a custom router protobuf. */ - abstract encodeRpc (rpc: PubSubRPC): Uint8Array + abstract encodeRpc (rpc: PubSubRPC): Uint8ArrayList /** * Encode RPC object into a Uint8Array. * This can be override to use a custom router protobuf. */ - abstract encodeMessage (rpc: PubSubRPCMessage): Uint8Array + abstract encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList /** * Send an rpc object to a peer @@ -523,26 +528,42 @@ export abstract class PubSubBaseProtocol extends EventEmi const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case 'StrictNoSign': + if (message.type !== 'unsigned') { + throw errcode(new Error('Message type should be "unsigned" when signature policy is StrictNoSign but it was not'), codes.ERR_MISSING_SIGNATURE) + } + + // @ts-expect-error should not be present if (message.signature != null) { throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE) } + + // @ts-expect-error should not be present if (message.key != null) { throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY) } + + // @ts-expect-error should not be present if (message.sequenceNumber != null) { throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO) } break case 'StrictSign': + if (message.type !== 'signed') { + throw errcode(new Error('Message type should be "signed" when signature policy is StrictSign but it was not'), codes.ERR_MISSING_SIGNATURE) + } + if (message.signature == null) { throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE) } + if (message.sequenceNumber == null) { - throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO) + throw errcode(new Error('StrictSigning: Signing required and no sequenceNumber was present'), codes.ERR_MISSING_SEQNO) } + if (!(await verifySignature(message, this.encodeMessage.bind(this)))) { throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE) } + break default: throw errcode(new Error('Cannot validate message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY) @@ -559,14 +580,16 @@ export abstract class PubSubBaseProtocol extends EventEmi * Normalizes the message and signs it, if signing is enabled. * Should be used by the routers to create the message to send. */ - async buildMessage (message: Message) { + async buildMessage (message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }): Promise { const signaturePolicy = this.globalSignaturePolicy switch (signaturePolicy) { case 'StrictSign': - message.sequenceNumber = randomSeqno() return await signMessage(this.components.getPeerId(), message, this.encodeMessage.bind(this)) case 'StrictNoSign': - return await Promise.resolve(message) + return await Promise.resolve({ + type: 'unsigned', + ...message + }) default: throw errcode(new Error('Cannot build message: unhandled signature policy'), codes.ERR_UNHANDLED_SIGNATURE_POLICY) } @@ -603,10 +626,11 @@ export abstract class PubSubBaseProtocol extends EventEmi throw new Error('Pubsub has not started') } - const message: Message = { + const message = { from: this.components.getPeerId(), topic, - data: data ?? new Uint8Array(0) + data: data ?? new Uint8Array(0), + sequenceNumber: randomSeqno() } log('publish topic: %s from: %p data: %m', topic, message.from, message.data) diff --git a/src/peer-streams.ts b/src/peer-streams.ts index 6a9d621..a9c259d 100644 --- a/src/peer-streams.ts +++ b/src/peer-streams.ts @@ -8,6 +8,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Stream } from '@libp2p/interface-connection' import type { Pushable } from 'it-pushable' import type { PeerStreamEvents } from '@libp2p/interface-pubsub' +import { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p-pubsub:peer-streams') @@ -25,11 +26,11 @@ export class PeerStreams extends EventEmitter { /** * Write stream - it's preferable to use the write method */ - public outboundStream?: Pushable + public outboundStream?: Pushable /** * Read stream */ - public inboundStream?: AsyncIterable + public inboundStream?: AsyncIterable /** * The raw outbound stream, as retrieved from conn.newStream */ @@ -72,13 +73,13 @@ export class PeerStreams extends EventEmitter { * Send a message to this peer. * Throws if there is no `stream` to write to available. */ - write (data: Uint8Array) { + write (data: Uint8Array | Uint8ArrayList) { if (this.outboundStream == null) { const id = this.id.toString() throw new Error('No writable connection to ' + id) } - this.outboundStream.push(data) + this.outboundStream.push(data instanceof Uint8Array ? new Uint8ArrayList(data) : data) } /** @@ -115,7 +116,8 @@ export class PeerStreams extends EventEmitter { } this._rawOutboundStream = stream - this.outboundStream = pushable({ + this.outboundStream = pushable({ + objectMode: true, onEnd: (shouldEmit) => { // close writable side of the stream if (this._rawOutboundStream != null && this._rawOutboundStream.reset != null) { // eslint-disable-line @typescript-eslint/prefer-optional-chain diff --git a/src/sign.ts b/src/sign.ts index 63d0f38..7b6bd27 100644 --- a/src/sign.ts +++ b/src/sign.ts @@ -3,21 +3,16 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toRpcMessage } from './utils.js' import type { PeerId } from '@libp2p/interface-peer-id' import { keys } from '@libp2p/crypto' -import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { PubSubRPCMessage, SignedMessage } from '@libp2p/interface-pubsub' import { peerIdFromKeys } from '@libp2p/peer-id' +import type { Uint8ArrayList } from 'uint8arraylist' export const SignPrefix = uint8ArrayFromString('libp2p-pubsub:') /** * Signs the provided message with the given `peerId` */ -export async function signMessage (peerId: PeerId, message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) { - // Get the message in bytes, and prepend with the pubsub prefix - const bytes = uint8ArrayConcat([ - SignPrefix, - encode(toRpcMessage(message)) - ]) - +export async function signMessage (peerId: PeerId, message: { from: PeerId, topic: string, data: Uint8Array, sequenceNumber: bigint }, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList): Promise { if (peerId.privateKey == null) { throw new Error('Cannot sign message, no private key present') } @@ -26,22 +21,36 @@ export async function signMessage (peerId: PeerId, message: Message, encode: (rp throw new Error('Cannot sign message, no public key present') } - const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey) - const signature = await privateKey.sign(bytes) - - const outputMessage: Message = { - ...message, - signature: signature, - key: peerId.publicKey + // @ts-expect-error signature field is missing, added below + const outputMessage: SignedMessage = { + type: 'signed', + topic: message.topic, + data: message.data, + sequenceNumber: message.sequenceNumber, + from: peerId } + // Get the message in bytes, and prepend with the pubsub prefix + const bytes = uint8ArrayConcat([ + SignPrefix, + encode(toRpcMessage(outputMessage)).subarray() + ]) + + const privateKey = await keys.unmarshalPrivateKey(peerId.privateKey) + outputMessage.signature = await privateKey.sign(bytes) + outputMessage.key = peerId.publicKey + return outputMessage } /** * Verifies the signature of the given message */ -export async function verifySignature (message: Message, encode: (rpc: PubSubRPCMessage) => Uint8Array) { +export async function verifySignature (message: SignedMessage, encode: (rpc: PubSubRPCMessage) => Uint8ArrayList) { + if (message.type !== 'signed') { + throw new Error('Message type must be "signed" to be verified') + } + if (message.signature == null) { throw new Error('Message must contain a signature to be verified') } @@ -57,7 +66,7 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC ...toRpcMessage(message), signature: undefined, key: undefined - }) + }).subarray() ]) // Get the public key @@ -72,7 +81,11 @@ export async function verifySignature (message: Message, encode: (rpc: PubSubRPC * Returns the PublicKey associated with the given message. * If no valid PublicKey can be retrieved an error will be returned. */ -export async function messagePublicKey (message: Message) { +export async function messagePublicKey (message: SignedMessage) { + if (message.type !== 'signed') { + throw new Error('Message type must be "signed" to have a public key') + } + // should be available in the from property of the message (peer id) if (message.from == null) { throw new Error('Could not get the public key from the originator id') diff --git a/src/utils.ts b/src/utils.ts index d542133..c67ebdc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -71,24 +71,40 @@ export const toMessage = (message: PubSubRPCMessage): Message => { throw errcode(new Error('RPC message was missing from'), codes.ERR_MISSING_FROM) } + if (message.sequenceNumber == null || message.from == null || message.signature == null || message.key == null) { + return { + type: 'unsigned', + topic: message.topic ?? '', + data: message.data ?? new Uint8Array(0) + } + } + return { + type: 'signed', from: peerIdFromBytes(message.from), topic: message.topic ?? '', - sequenceNumber: message.sequenceNumber == null ? undefined : bigIntFromBytes(message.sequenceNumber), + sequenceNumber: bigIntFromBytes(message.sequenceNumber), data: message.data ?? new Uint8Array(0), - signature: message.signature ?? undefined, - key: message.key ?? undefined + signature: message.signature, + key: message.key } } export const toRpcMessage = (message: Message): PubSubRPCMessage => { + if (message.type === 'signed') { + return { + from: message.from.multihash.bytes, + data: message.data, + sequenceNumber: bigIntToBytes(message.sequenceNumber), + topic: message.topic, + signature: message.signature, + key: message.key + } + } + return { - from: message.from.multihash.bytes, data: message.data, - sequenceNumber: message.sequenceNumber == null ? undefined : bigIntToBytes(message.sequenceNumber), - topic: message.topic, - signature: message.signature, - key: message.key + topic: message.topic } } diff --git a/test/instance.spec.ts b/test/instance.spec.ts index 894a303..9f2f897 100644 --- a/test/instance.spec.ts +++ b/test/instance.spec.ts @@ -1,21 +1,22 @@ import { expect } from 'aegir/chai' import { PubSubBaseProtocol } from '../src/index.js' import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { Uint8ArrayList } from 'uint8arraylist' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { throw new Error('Method not implemented.') } - encodeRpc (rpc: PubSubRPC): Uint8Array { + encodeRpc (rpc: PubSubRPC): Uint8ArrayList { throw new Error('Method not implemented.') } - decodeMessage (bytes: Uint8Array): PubSubRPCMessage { + decodeMessage (bytes: Uint8Array | Uint8ArrayList): PubSubRPCMessage { throw new Error('Method not implemented.') } - encodeMessage (rpc: PubSubRPCMessage): Uint8Array { + encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { throw new Error('Method not implemented.') } diff --git a/test/lifecycle.spec.ts b/test/lifecycle.spec.ts index 8198695..6024d8c 100644 --- a/test/lifecycle.spec.ts +++ b/test/lifecycle.spec.ts @@ -12,21 +12,22 @@ import type { PeerId } from '@libp2p/interface-peer-id' import type { Registrar } from '@libp2p/interface-registrar' import type { PublishResult, PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' import { Components } from '@libp2p/components' +import type { Uint8ArrayList } from 'uint8arraylist' class PubsubProtocol extends PubSubBaseProtocol { decodeRpc (bytes: Uint8Array): PubSubRPC { throw new Error('Method not implemented.') } - encodeRpc (rpc: PubSubRPC): Uint8Array { + encodeRpc (rpc: PubSubRPC): Uint8ArrayList { throw new Error('Method not implemented.') } - decodeMessage (bytes: Uint8Array): PubSubRPCMessage { + decodeMessage (bytes: Uint8Array | Uint8ArrayList): PubSubRPCMessage { throw new Error('Method not implemented.') } - encodeMessage (rpc: PubSubRPCMessage): Uint8Array { + encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { throw new Error('Method not implemented.') } diff --git a/test/message.spec.ts b/test/message.spec.ts index 3cb187f..889ab68 100644 --- a/test/message.spec.ts +++ b/test/message.spec.ts @@ -10,6 +10,7 @@ import { import type { PeerId } from '@libp2p/interface-peer-id' import type { Message } from '@libp2p/interface-pubsub' import { Components } from '@libp2p/components' +import { randomSeqno } from '../src/utils.js' describe('pubsub base messages', () => { let peerId: PeerId @@ -31,10 +32,11 @@ describe('pubsub base messages', () => { }) it('buildMessage normalizes and signs messages', async () => { - const message: Message = { + const message = { from: peerId, data: uint8ArrayFromString('hello'), - topic: 'test-topic' + topic: 'test-topic', + sequenceNumber: randomSeqno() } const signedMessage = await pubsub.buildMessage(message) @@ -43,34 +45,43 @@ describe('pubsub base messages', () => { }) it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => { - const message: Message = { + const message = { from: peerId, data: uint8ArrayFromString('hello'), - topic: 'test-topic' + topic: 'test-topic', + sequenceNumber: randomSeqno() } sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictSign') const signedMessage = await pubsub.buildMessage(message) + if (signedMessage.type === 'unsigned') { + throw new Error('Message was not signed') + } + sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign') await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected() // @ts-expect-error this field is not optional delete signedMessage.from await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected() + // @ts-expect-error this field is not optional delete signedMessage.signature await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected() + // @ts-expect-error this field is not optional delete signedMessage.key await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected() + // @ts-expect-error this field is not optional delete signedMessage.sequenceNumber await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected() }) it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => { - const message: Message = { + const message = { from: peerId, data: uint8ArrayFromString('hello'), - topic: 'test-topic' + topic: 'test-topic', + sequenceNumber: randomSeqno() } sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign') @@ -80,8 +91,9 @@ describe('pubsub base messages', () => { }) it('validate with StrictSign requires a signature', async () => { + // @ts-expect-error incomplete implementation const message: Message = { - from: peerId, + type: 'signed', data: uint8ArrayFromString('hello'), topic: 'test-topic' } diff --git a/test/message/rpc.ts b/test/message/rpc.ts index 4440d48..f4e5283 100644 --- a/test/message/rpc.ts +++ b/test/message/rpc.ts @@ -3,6 +3,7 @@ import { encodeMessage, decodeMessage, message, bool, string, bytes, uint64 } from 'protons-runtime' import type { Codec } from 'protons-runtime' +import type { Uint8ArrayList } from 'uint8arraylist' export interface RPC { subscriptions: RPC.SubOpts[] @@ -24,11 +25,11 @@ export namespace RPC { }) } - export const encode = (obj: SubOpts): Uint8Array => { + export const encode = (obj: SubOpts): Uint8ArrayList => { return encodeMessage(obj, SubOpts.codec()) } - export const decode = (buf: Uint8Array): SubOpts => { + export const decode = (buf: Uint8Array | Uint8ArrayList): SubOpts => { return decodeMessage(buf, SubOpts.codec()) } } @@ -54,11 +55,11 @@ export namespace RPC { }) } - export const encode = (obj: Message): Uint8Array => { + export const encode = (obj: Message): Uint8ArrayList => { return encodeMessage(obj, Message.codec()) } - export const decode = (buf: Uint8Array): Message => { + export const decode = (buf: Uint8Array | Uint8ArrayList): Message => { return decodeMessage(buf, Message.codec()) } } @@ -71,11 +72,11 @@ export namespace RPC { }) } - export const encode = (obj: RPC): Uint8Array => { + export const encode = (obj: RPC): Uint8ArrayList => { return encodeMessage(obj, RPC.codec()) } - export const decode = (buf: Uint8Array): RPC => { + export const decode = (buf: Uint8Array | Uint8ArrayList): RPC => { return decodeMessage(buf, RPC.codec()) } } @@ -97,11 +98,11 @@ export namespace ControlMessage { }) } - export const encode = (obj: ControlMessage): Uint8Array => { + export const encode = (obj: ControlMessage): Uint8ArrayList => { return encodeMessage(obj, ControlMessage.codec()) } - export const decode = (buf: Uint8Array): ControlMessage => { + export const decode = (buf: Uint8Array | Uint8ArrayList): ControlMessage => { return decodeMessage(buf, ControlMessage.codec()) } } @@ -119,11 +120,11 @@ export namespace ControlIHave { }) } - export const encode = (obj: ControlIHave): Uint8Array => { + export const encode = (obj: ControlIHave): Uint8ArrayList => { return encodeMessage(obj, ControlIHave.codec()) } - export const decode = (buf: Uint8Array): ControlIHave => { + export const decode = (buf: Uint8Array | Uint8ArrayList): ControlIHave => { return decodeMessage(buf, ControlIHave.codec()) } } @@ -139,11 +140,11 @@ export namespace ControlIWant { }) } - export const encode = (obj: ControlIWant): Uint8Array => { + export const encode = (obj: ControlIWant): Uint8ArrayList => { return encodeMessage(obj, ControlIWant.codec()) } - export const decode = (buf: Uint8Array): ControlIWant => { + export const decode = (buf: Uint8Array | Uint8ArrayList): ControlIWant => { return decodeMessage(buf, ControlIWant.codec()) } } @@ -159,11 +160,11 @@ export namespace ControlGraft { }) } - export const encode = (obj: ControlGraft): Uint8Array => { + export const encode = (obj: ControlGraft): Uint8ArrayList => { return encodeMessage(obj, ControlGraft.codec()) } - export const decode = (buf: Uint8Array): ControlGraft => { + export const decode = (buf: Uint8Array | Uint8ArrayList): ControlGraft => { return decodeMessage(buf, ControlGraft.codec()) } } @@ -183,11 +184,11 @@ export namespace ControlPrune { }) } - export const encode = (obj: ControlPrune): Uint8Array => { + export const encode = (obj: ControlPrune): Uint8ArrayList => { return encodeMessage(obj, ControlPrune.codec()) } - export const decode = (buf: Uint8Array): ControlPrune => { + export const decode = (buf: Uint8Array | Uint8ArrayList): ControlPrune => { return decodeMessage(buf, ControlPrune.codec()) } } @@ -205,11 +206,11 @@ export namespace PeerInfo { }) } - export const encode = (obj: PeerInfo): Uint8Array => { + export const encode = (obj: PeerInfo): Uint8ArrayList => { return encodeMessage(obj, PeerInfo.codec()) } - export const decode = (buf: Uint8Array): PeerInfo => { + export const decode = (buf: Uint8Array | Uint8ArrayList): PeerInfo => { return decodeMessage(buf, PeerInfo.codec()) } } diff --git a/test/pubsub.spec.ts b/test/pubsub.spec.ts index d5d8d7f..411f50b 100644 --- a/test/pubsub.spec.ts +++ b/test/pubsub.spec.ts @@ -16,7 +16,7 @@ import { PeerSet } from '@libp2p/peer-collections' import { Components } from '@libp2p/components' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { noSignMsgId } from '../src/utils.js' -import type { PubSubRPC } from '@libp2p/interface-pubsub' +import type { Message, PubSubRPC } from '@libp2p/interface-pubsub' import delay from 'delay' import pDefer from 'p-defer' @@ -59,20 +59,18 @@ describe('pubsub base implementation', () => { }) it('should sign messages on publish', async () => { - sinon.spy(pubsub, 'publishMessage') + const publishMessageSpy = sinon.spy(pubsub, 'publishMessage') await pubsub.start() await pubsub.publish(topic, message) // event dispatch is async await pWaitFor(() => { - // @ts-expect-error .callCount is a added by sinon - return pubsub.publishMessage.callCount === 1 + return publishMessageSpy.callCount === 1 }) // Get the first message sent to _publish, and validate it - // @ts-expect-error .getCall is a added by sinon - const signedMessage: Message = pubsub.publishMessage.getCall(0).lastArg + const signedMessage: Message = publishMessageSpy.getCall(0).lastArg await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined() }) diff --git a/test/sign.spec.ts b/test/sign.spec.ts index 1a0d6c9..89e8d53 100644 --- a/test/sign.spec.ts +++ b/test/sign.spec.ts @@ -10,10 +10,11 @@ import { import * as PeerIdFactory from '@libp2p/peer-id-factory' import { randomSeqno, toRpcMessage } from '../src/utils.js' import { keys } from '@libp2p/crypto' -import type { Message, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { PubSubRPCMessage } from '@libp2p/interface-pubsub' import type { PeerId } from '@libp2p/interface-peer-id' +import type { Uint8ArrayList } from 'uint8arraylist' -function encodeMessage (message: PubSubRPCMessage) { +function encodeMessage (message: PubSubRPCMessage): Uint8ArrayList { return RPC.Message.encode(message) } @@ -27,14 +28,16 @@ describe('message signing', () => { }) it('should be able to sign and verify a message', async () => { - const message: Message = { + const message = { + type: 'signed', from: peerId, data: uint8ArrayFromString('hello'), sequenceNumber: randomSeqno(), topic: 'test-topic' } - const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message))]) + // @ts-expect-error missing fields + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message)).subarray()]) if (peerId.privateKey == null) { throw new Error('No private key found on PeerId') @@ -60,14 +63,16 @@ describe('message signing', () => { it('should be able to extract the public key from an inlined key', async () => { const secPeerId = await PeerIdFactory.createSecp256k1PeerId() - const message: Message = { + const message = { + type: 'signed', from: secPeerId, data: uint8ArrayFromString('hello'), sequenceNumber: randomSeqno(), topic: 'test-topic' } - const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message))]) + // @ts-expect-error missing fields + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message)).subarray()]) if (secPeerId.privateKey == null) { throw new Error('No private key found on PeerId') @@ -80,6 +85,7 @@ describe('message signing', () => { // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) + // @ts-expect-error field is required signedMessage.key = undefined // Verify the signature @@ -91,14 +97,16 @@ describe('message signing', () => { }) it('should be able to extract the public key from the message', async () => { - const message: Message = { + const message = { + type: 'signed', from: peerId, data: uint8ArrayFromString('hello'), sequenceNumber: randomSeqno(), topic: 'test-topic' } - const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message))]) + // @ts-expect-error missing fields + const bytesToSign = uint8ArrayConcat([SignPrefix, RPC.Message.encode(toRpcMessage(message)).subarray()]) if (peerId.privateKey == null) { throw new Error('No private key found on PeerId') diff --git a/test/utils.spec.ts b/test/utils.spec.ts index 85a31ad..85b0895 100644 --- a/test/utils.spec.ts +++ b/test/utils.spec.ts @@ -44,28 +44,36 @@ describe('utils', () => { const binaryId = uint8ArrayFromString('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'base16') const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' const m: Message[] = [{ + type: 'signed', from: peerIdFromBytes(binaryId), topic: '', - data: new Uint8Array() + data: new Uint8Array(), + sequenceNumber: 1n, + signature: new Uint8Array(), + key: new Uint8Array() }, { + type: 'signed', from: peerIdFromString(stringId), topic: '', - data: new Uint8Array() + data: new Uint8Array(), + sequenceNumber: 1n, + signature: new Uint8Array(), + key: new Uint8Array() }] const expected: PubSubRPCMessage[] = [{ from: binaryId, topic: '', data: new Uint8Array(), - sequenceNumber: undefined, - signature: undefined, - key: undefined + sequenceNumber: utils.bigIntToBytes(1n), + signature: new Uint8Array(), + key: new Uint8Array() }, { from: binaryId, topic: '', data: new Uint8Array(), - sequenceNumber: undefined, - signature: undefined, - key: undefined + sequenceNumber: utils.bigIntToBytes(1n), + signature: new Uint8Array(), + key: new Uint8Array() }] for (let i = 0; i < m.length; i++) { expect(utils.toRpcMessage(m[i])).to.deep.equal(expected[i]) diff --git a/test/utils/index.ts b/test/utils/index.ts index abeb01b..f8a2f9b 100644 --- a/test/utils/index.ts +++ b/test/utils/index.ts @@ -6,6 +6,7 @@ import type { IncomingStreamData, Registrar, StreamHandler, StreamHandlerRecord, import type { Connection } from '@libp2p/interface-connection' import type { PeerId } from '@libp2p/interface-peer-id' import type { PubSubRPC, PubSubRPCMessage } from '@libp2p/interface-pubsub' +import type { Uint8ArrayList } from 'uint8arraylist' export const createPeerId = async (): Promise => { const peerId = await PeerIdFactory.createEd25519PeerId() @@ -24,7 +25,7 @@ export class PubsubImplementation extends PubSubBaseProtocol { return RPC.decode(bytes) } - encodeRpc (rpc: PubSubRPC): Uint8Array { + encodeRpc (rpc: PubSubRPC): Uint8ArrayList { return RPC.encode(rpc) } @@ -32,7 +33,7 @@ export class PubsubImplementation extends PubSubBaseProtocol { return RPC.Message.decode(bytes) } - encodeMessage (rpc: PubSubRPCMessage): Uint8Array { + encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { return RPC.Message.encode(rpc) } }