From b267e7a9c494867bc7b7c9ad3e420357ae74116f Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 3 Aug 2023 16:48:11 +0100 Subject: [PATCH] feat!: close streams gracefully (#344) * feat!: close streams gracefully - Updates all libp2p related deps - Stream close methods are now async BREAKING CHANGE: stream close methods are now asyc, requires libp2p@0.46.x or later * chore: update yamux --- package.json | 41 +++++++++++++------------- src/@types/handshake-interface.ts | 2 +- src/@types/libp2p.ts | 2 +- src/handshake-xx.ts | 22 +++++++------- src/index.ts | 2 +- src/metrics.ts | 2 +- src/noise.ts | 16 +++++----- src/utils.ts | 2 +- test/compliance.spec.ts | 2 +- test/fixtures/peer.ts | 2 +- test/index.spec.ts | 12 ++++---- test/noise.spec.ts | 49 ++++++++++++++++--------------- test/utils.ts | 4 +-- test/xx-handshake.spec.ts | 16 +++++----- 14 files changed, 87 insertions(+), 87 deletions(-) diff --git a/package.json b/package.json index bca8d1f..0791308 100644 --- a/package.json +++ b/package.json @@ -68,41 +68,40 @@ "prepublish": "npm run build" }, "dependencies": { - "@libp2p/crypto": "^1.0.11", - "@libp2p/interface-connection-encrypter": "^4.0.0", - "@libp2p/interface-keys": "^1.0.6", - "@libp2p/interface-metrics": "^4.0.4", - "@libp2p/interface-peer-id": "^2.0.0", - "@libp2p/logger": "^2.0.5", - "@libp2p/peer-id": "^2.0.0", + "@libp2p/crypto": "^2.0.0", + "@libp2p/interface": "^0.1.0", + "@libp2p/logger": "^3.0.0", + "@libp2p/peer-id": "^3.0.0", + "@noble/ciphers": "^0.1.4", "@noble/curves": "^1.1.0", "@noble/hashes": "^1.3.1", - "@noble/ciphers": "^0.1.4", + "it-byte-stream": "^1.0.0", "it-length-prefixed": "^9.0.1", - "it-pair": "^2.0.2", - "it-pb-stream": "^4.0.1", + "it-length-prefixed-stream": "^1.0.0", + "it-pair": "^2.0.6", "it-pipe": "^3.0.1", "it-stream-types": "^2.0.1", "protons-runtime": "^5.0.0", - "uint8arraylist": "^2.3.2", - "uint8arrays": "^4.0.2" + "uint8arraylist": "^2.4.3", + "uint8arrays": "^4.0.4" }, "devDependencies": { - "@chainsafe/libp2p-yamux": "^4.0.1", - "@libp2p/daemon-client": "^6.0.3", - "@libp2p/daemon-server": "^5.0.2", - "@libp2p/interface-connection-encrypter-compliance-tests": "^5.0.0", - "@libp2p/interop": "^8.0.1", - "@libp2p/peer-id-factory": "^2.0.0", - "@libp2p/tcp": "^7.0.0", + "@chainsafe/libp2p-yamux": "^5.0.0", + "@libp2p/daemon-client": "^7.0.0", + "@libp2p/daemon-server": "^6.0.0", + "@libp2p/interface-compliance-tests": "^4.0.0", + "@libp2p/interface-peer-id": "^2.0.2", + "@libp2p/interop": "^9.0.0", + "@libp2p/peer-id-factory": "^3.0.0", + "@libp2p/tcp": "^8.0.0", "@multiformats/multiaddr": "^12.1.0", "@types/sinon": "^10.0.14", - "aegir": "^39.0.5", + "aegir": "^40.0.8", "benchmark": "^2.1.4", "execa": "^7.0.0", "go-libp2p": "^1.0.3", "iso-random-stream": "^2.0.2", - "libp2p": "0.45.0", + "libp2p": "^0.46.0", "mkdirp": "^3.0.0", "p-defer": "^4.0.0", "protons": "^7.0.0", diff --git a/src/@types/handshake-interface.ts b/src/@types/handshake-interface.ts index 0edc201..9b402b1 100644 --- a/src/@types/handshake-interface.ts +++ b/src/@types/handshake-interface.ts @@ -1,7 +1,7 @@ import type { bytes } from './basic.js' import type { NoiseSession } from './handshake.js' import type { NoiseExtensions } from '../proto/payload.js' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerId } from '@libp2p/interface/peer-id' export interface IHandshake { session: NoiseSession diff --git a/src/@types/libp2p.ts b/src/@types/libp2p.ts index 8fa110d..c20fe93 100644 --- a/src/@types/libp2p.ts +++ b/src/@types/libp2p.ts @@ -1,6 +1,6 @@ import type { bytes32 } from './basic.js' import type { NoiseExtensions } from '../proto/payload.js' -import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter' +import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter' export interface KeyPair { publicKey: bytes32 diff --git a/src/handshake-xx.ts b/src/handshake-xx.ts index 2c74415..92a6cf6 100644 --- a/src/handshake-xx.ts +++ b/src/handshake-xx.ts @@ -1,4 +1,4 @@ -import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface-connection-encrypter/errors' +import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface/errors' import { decode0, decode1, decode2, encode0, encode1, encode2 } from './encoder.js' import { XX } from './handshakes/xx.js' import { @@ -20,8 +20,8 @@ import type { CipherState, NoiseSession } from './@types/handshake.js' import type { KeyPair } from './@types/libp2p.js' import type { ICryptoInterface } from './crypto.js' import type { NoiseExtensions } from './proto/payload.js' -import type { PeerId } from '@libp2p/interface-peer-id' -import type { ProtobufStream } from 'it-pb-stream' +import type { PeerId } from '@libp2p/interface/peer-id' +import type { LengthPrefixedStream } from 'it-length-prefixed-stream' export class XXHandshake implements IHandshake { public isInitiator: boolean @@ -30,7 +30,7 @@ export class XXHandshake implements IHandshake { public remoteExtensions: NoiseExtensions = { webtransportCerthashes: [] } protected payload: bytes - protected connection: ProtobufStream + protected connection: LengthPrefixedStream protected xx: XX protected staticKeypair: KeyPair @@ -42,7 +42,7 @@ export class XXHandshake implements IHandshake { prologue: bytes32, crypto: ICryptoInterface, staticKeypair: KeyPair, - connection: ProtobufStream, + connection: LengthPrefixedStream, remotePeer?: PeerId, handshake?: XX ) { @@ -64,12 +64,12 @@ export class XXHandshake implements IHandshake { if (this.isInitiator) { logger.trace('Stage 0 - Initiator starting to send first message.') const messageBuffer = this.xx.sendMessage(this.session, new Uint8Array(0)) - this.connection.writeLP(encode0(messageBuffer)) + await this.connection.write(encode0(messageBuffer)) logger.trace('Stage 0 - Initiator finished sending first message.') logLocalEphemeralKeys(this.session.hs.e) } else { logger.trace('Stage 0 - Responder waiting to receive first message...') - const receivedMessageBuffer = decode0((await this.connection.readLP()).subarray()) + const receivedMessageBuffer = decode0((await this.connection.read()).subarray()) const { valid } = this.xx.recvMessage(this.session, receivedMessageBuffer) if (!valid) { throw new InvalidCryptoExchangeError('xx handshake stage 0 validation fail') @@ -83,7 +83,7 @@ export class XXHandshake implements IHandshake { public async exchange (): Promise { if (this.isInitiator) { logger.trace('Stage 1 - Initiator waiting to receive first message from responder...') - const receivedMessageBuffer = decode1((await this.connection.readLP()).subarray()) + const receivedMessageBuffer = decode1((await this.connection.read()).subarray()) const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer) if (!valid) { throw new InvalidCryptoExchangeError('xx handshake stage 1 validation fail') @@ -106,7 +106,7 @@ export class XXHandshake implements IHandshake { } else { logger.trace('Stage 1 - Responder sending out first message with signed payload and static key.') const messageBuffer = this.xx.sendMessage(this.session, this.payload) - this.connection.writeLP(encode1(messageBuffer)) + await this.connection.write(encode1(messageBuffer)) logger.trace('Stage 1 - Responder sent the second handshake message with signed payload.') logLocalEphemeralKeys(this.session.hs.e) } @@ -117,11 +117,11 @@ export class XXHandshake implements IHandshake { if (this.isInitiator) { logger.trace('Stage 2 - Initiator sending third handshake message.') const messageBuffer = this.xx.sendMessage(this.session, this.payload) - this.connection.writeLP(encode2(messageBuffer)) + await this.connection.write(encode2(messageBuffer)) logger.trace('Stage 2 - Initiator sent message with signed payload.') } else { logger.trace('Stage 2 - Responder waiting for third handshake message...') - const receivedMessageBuffer = decode2((await this.connection.readLP()).subarray()) + const receivedMessageBuffer = decode2((await this.connection.read()).subarray()) const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer) if (!valid) { throw new InvalidCryptoExchangeError('xx handshake stage 2 validation fail') diff --git a/src/index.ts b/src/index.ts index 8c15571..3a42c89 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import { Noise } from './noise.js' import type { NoiseInit } from './noise.js' import type { NoiseExtensions } from './proto/payload.js' -import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter' +import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter' export type { ICryptoInterface } from './crypto.js' export { pureJsCrypto } from './crypto/js.js' diff --git a/src/metrics.ts b/src/metrics.ts index ee8bb67..8d0b3a4 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -1,4 +1,4 @@ -import type { Counter, Metrics } from '@libp2p/interface-metrics' +import type { Counter, Metrics } from '@libp2p/interface/metrics' export type MetricsRegistry = Record diff --git a/src/noise.ts b/src/noise.ts index 72f70f8..2277a75 100644 --- a/src/noise.ts +++ b/src/noise.ts @@ -1,6 +1,6 @@ import { decode } from 'it-length-prefixed' +import { lpStream, type LengthPrefixedStream } from 'it-length-prefixed-stream' import { duplexPair } from 'it-pair/duplex' -import { pbStream, type ProtobufStream } from 'it-pb-stream' import { pipe } from 'it-pipe' import { NOISE_MSG_MAX_LENGTH_BYTES } from './constants.js' import { pureJsCrypto } from './crypto/js.js' @@ -14,13 +14,13 @@ import type { IHandshake } from './@types/handshake-interface.js' import type { INoiseConnection, KeyPair } from './@types/libp2p.js' import type { ICryptoInterface } from './crypto.js' import type { NoiseExtensions } from './proto/payload.js' -import type { SecuredConnection } from '@libp2p/interface-connection-encrypter' -import type { Metrics } from '@libp2p/interface-metrics' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { SecuredConnection } from '@libp2p/interface/connection-encrypter' +import type { Metrics } from '@libp2p/interface/metrics' +import type { PeerId } from '@libp2p/interface/peer-id' import type { Duplex, Source } from 'it-stream-types' interface HandshakeParams { - connection: ProtobufStream + connection: LengthPrefixedStream isInitiator: boolean localPeer: PeerId remotePeer?: PeerId @@ -71,7 +71,7 @@ export class Noise implements INoiseConnection { * @returns {Promise} */ public async secureOutbound (localPeer: PeerId, connection: Duplex, AsyncIterable, Promise>, remotePeer?: PeerId): Promise> { - const wrappedConnection = pbStream( + const wrappedConnection = lpStream( connection, { lengthEncoder: uint16BEEncode, @@ -103,7 +103,7 @@ export class Noise implements INoiseConnection { * @returns {Promise} */ public async secureInbound (localPeer: PeerId, connection: Duplex, AsyncIterable, Promise>, remotePeer?: PeerId): Promise> { - const wrappedConnection = pbStream( + const wrappedConnection = lpStream( connection, { lengthEncoder: uint16BEEncode, @@ -171,7 +171,7 @@ export class Noise implements INoiseConnection { } private async createSecureConnection ( - connection: ProtobufStream, AsyncIterable, Promise>>, + connection: LengthPrefixedStream, AsyncIterable, Promise>>, handshake: IHandshake ): Promise, Source, Promise>> { // Create encryption box/unbox wrapper diff --git a/src/utils.ts b/src/utils.ts index 50e7ce4..993c962 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -4,7 +4,7 @@ import { concat as uint8ArrayConcat } from 'uint8arrays/concat' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { type NoiseExtensions, NoiseHandshakePayload } from './proto/payload.js' import type { bytes } from './@types/basic.js' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerId } from '@libp2p/interface/peer-id' export async function getPayload ( localPeer: PeerId, diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index bc45eb2..e97dc0a 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -1,4 +1,4 @@ -import tests from '@libp2p/interface-connection-encrypter-compliance-tests' +import tests from '@libp2p/interface-compliance-tests/connection-encryption' import { Noise } from '../src/noise.js' describe('spec compliance tests', function () { diff --git a/test/fixtures/peer.ts b/test/fixtures/peer.ts index 07550e2..753d9d6 100644 --- a/test/fixtures/peer.ts +++ b/test/fixtures/peer.ts @@ -1,5 +1,5 @@ import { createEd25519PeerId, createFromJSON } from '@libp2p/peer-id-factory' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerId } from '@libp2p/interface/peer-id' // ed25519 keys const peers = [{ diff --git a/test/index.spec.ts b/test/index.spec.ts index 255b81d..5ce901d 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -1,12 +1,12 @@ import { expect } from 'aegir/chai' +import { lpStream } from 'it-length-prefixed-stream' import { duplexPair } from 'it-pair/duplex' -import { pbStream } from 'it-pb-stream' import sinon from 'sinon' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { noise } from '../src/index.js' import { Noise } from '../src/noise.js' import { createPeerIdsFromFixtures } from './fixtures/peer.js' -import type { Metrics } from '@libp2p/interface-metrics' +import type { Metrics } from '@libp2p/interface/metrics' function createCounterSpy (): ReturnType { return sinon.spy({ @@ -41,11 +41,11 @@ describe('Index', () => { noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), noiseResp.secureInbound(remotePeer, inboundConnection, localPeer) ]) - const wrappedInbound = pbStream(inbound.conn) - const wrappedOutbound = pbStream(outbound.conn) + const wrappedInbound = lpStream(inbound.conn) + const wrappedOutbound = lpStream(outbound.conn) - wrappedOutbound.writeLP(uint8ArrayFromString('test')) - await wrappedInbound.readLP() + await wrappedOutbound.write(uint8ArrayFromString('test')) + await wrappedInbound.read() expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1) expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0) expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1) diff --git a/test/noise.spec.ts b/test/noise.spec.ts index 3311b2d..8b2c693 100644 --- a/test/noise.spec.ts +++ b/test/noise.spec.ts @@ -1,8 +1,9 @@ import { Buffer } from 'buffer' import { assert, expect } from 'aegir/chai' import { randomBytes } from 'iso-random-stream' +import { byteStream } from 'it-byte-stream' +import { lpStream } from 'it-length-prefixed-stream' import { duplexPair } from 'it-pair/duplex' -import { pbStream } from 'it-pb-stream' import sinon from 'sinon' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' @@ -16,7 +17,7 @@ import { Noise } from '../src/noise.js' import { createHandshakePayload, getHandshakePayload, getPayload, signPayload } from '../src/utils.js' import { createPeerIdsFromFixtures } from './fixtures/peer.js' import { getKeyPairFromPeerId } from './utils.js' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerId } from '@libp2p/interface/peer-id' describe('Noise', () => { let remotePeer: PeerId, localPeer: PeerId @@ -40,11 +41,11 @@ describe('Noise', () => { noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), noiseResp.secureInbound(remotePeer, inboundConnection, localPeer) ]) - const wrappedInbound = pbStream(inbound.conn) - const wrappedOutbound = pbStream(outbound.conn) + const wrappedInbound = lpStream(inbound.conn) + const wrappedOutbound = lpStream(outbound.conn) - wrappedOutbound.writeLP(Buffer.from('test')) - const response = await wrappedInbound.readLP() + await wrappedOutbound.write(Buffer.from('test')) + const response = await wrappedInbound.read() expect(uint8ArrayToString(response.slice())).equal('test') } catch (e) { const err = e as Error @@ -59,7 +60,7 @@ describe('Noise', () => { const [outbound, { wrapped, handshake }] = await Promise.all([ noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), (async () => { - const wrapped = pbStream( + const wrapped = lpStream( inboundConnection, { lengthEncoder: uint16BEEncode, @@ -74,7 +75,7 @@ describe('Noise', () => { const payload = await getPayload(remotePeer, staticKeys.publicKey) const handshake = new XXHandshake(false, payload, prologue, pureJsCrypto, staticKeys, wrapped, localPeer, xx) - let receivedMessageBuffer = decode0((await wrapped.readLP()).slice()) + let receivedMessageBuffer = decode0((await wrapped.read()).slice()) // The first handshake message contains the initiator's ephemeral public key expect(receivedMessageBuffer.ne.length).equal(32) xx.recvMessage(handshake.session, receivedMessageBuffer) @@ -85,20 +86,20 @@ describe('Noise', () => { const handshakePayload = createHandshakePayload(libp2pPubKey, signedPayload) const messageBuffer = xx.sendMessage(handshake.session, handshakePayload) - wrapped.writeLP(encode1(messageBuffer)) + await wrapped.write(encode1(messageBuffer)) // Stage 2 - finish handshake - receivedMessageBuffer = decode2((await wrapped.readLP()).slice()) + receivedMessageBuffer = decode2((await wrapped.read()).slice()) xx.recvMessage(handshake.session, receivedMessageBuffer) return { wrapped, handshake } })() ]) - const wrappedOutbound = pbStream(outbound.conn) - wrappedOutbound.write(uint8ArrayFromString('test')) + const wrappedOutbound = byteStream(outbound.conn) + await wrappedOutbound.write(uint8ArrayFromString('test')) // Check that noise message is prefixed with 16-bit big-endian unsigned integer - const data = (await wrapped.readLP()).slice() + const data = (await wrapped.read()).slice() const { plaintext: decrypted, valid } = handshake.decrypt(data, handshake.session) // Decrypted data should match expect(uint8ArrayEquals(decrypted, uint8ArrayFromString('test'))).to.be.true() @@ -116,11 +117,11 @@ describe('Noise', () => { noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), noiseResp.secureInbound(remotePeer, inboundConnection, localPeer) ]) - const wrappedInbound = pbStream(inbound.conn) - const wrappedOutbound = pbStream(outbound.conn) + const wrappedInbound = byteStream(inbound.conn) + const wrappedOutbound = lpStream(outbound.conn) const largePlaintext = randomBytes(60000) - wrappedOutbound.writeLP(Buffer.from(largePlaintext)) + await wrappedOutbound.write(Buffer.from(largePlaintext)) const response = await wrappedInbound.read(60000) expect(response.length).equals(largePlaintext.length) @@ -142,11 +143,11 @@ describe('Noise', () => { noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), noiseResp.secureInbound(remotePeer, inboundConnection) ]) - const wrappedInbound = pbStream(inbound.conn) - const wrappedOutbound = pbStream(outbound.conn) + const wrappedInbound = lpStream(inbound.conn) + const wrappedOutbound = lpStream(outbound.conn) - wrappedOutbound.writeLP(Buffer.from('test v2')) - const response = await wrappedInbound.readLP() + await wrappedOutbound.write(Buffer.from('test v2')) + const response = await wrappedInbound.read() expect(uint8ArrayToString(response.slice())).equal('test v2') if (inbound.remotePeer.publicKey == null || localPeer.publicKey == null || @@ -195,11 +196,11 @@ describe('Noise', () => { noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer), noiseResp.secureInbound(remotePeer, inboundConnection, localPeer) ]) - const wrappedInbound = pbStream(inbound.conn) - const wrappedOutbound = pbStream(outbound.conn) + const wrappedInbound = lpStream(inbound.conn) + const wrappedOutbound = lpStream(outbound.conn) - wrappedOutbound.writeLP(Buffer.from('test')) - const response = await wrappedInbound.readLP() + await wrappedOutbound.write(Buffer.from('test')) + const response = await wrappedInbound.read() expect(uint8ArrayToString(response.slice())).equal('test') } catch (e) { const err = e as Error diff --git a/test/utils.ts b/test/utils.ts index e36e337..4e5cd5d 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -1,7 +1,7 @@ import { keys } from '@libp2p/crypto' import type { KeyPair } from '../src/@types/libp2p.js' -import type { PrivateKey } from '@libp2p/interface-keys' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PrivateKey } from '@libp2p/interface/keys' +import type { PeerId } from '@libp2p/interface/peer-id' export async function generateEd25519Keys (): Promise { return keys.generateKeyPair('Ed25519', 32) diff --git a/test/xx-handshake.spec.ts b/test/xx-handshake.spec.ts index 7e03900..7223e76 100644 --- a/test/xx-handshake.spec.ts +++ b/test/xx-handshake.spec.ts @@ -1,13 +1,13 @@ import { Buffer } from 'buffer' import { assert, expect } from 'aegir/chai' +import { lpStream } from 'it-length-prefixed-stream' import { duplexPair } from 'it-pair/duplex' -import { pbStream } from 'it-pb-stream' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { pureJsCrypto } from '../src/crypto/js.js' import { XXHandshake } from '../src/handshake-xx.js' import { getPayload } from '../src/utils.js' import { createPeerIdsFromFixtures } from './fixtures/peer.js' -import type { PeerId } from '@libp2p/interface-peer-id' +import type { PeerId } from '@libp2p/interface/peer-id' describe('XX Handshake', () => { let peerA: PeerId, peerB: PeerId, fakePeer: PeerId @@ -19,8 +19,8 @@ describe('XX Handshake', () => { it('should propose, exchange and finish handshake', async () => { try { const duplex = duplexPair() - const connectionFrom = pbStream(duplex[0]) - const connectionTo = pbStream(duplex[1]) + const connectionFrom = lpStream(duplex[0]) + const connectionTo = lpStream(duplex[1]) const prologue = Buffer.alloc(0) const staticKeysInitiator = pureJsCrypto.generateX25519KeyPair() @@ -66,8 +66,8 @@ describe('XX Handshake', () => { it('Initiator should fail to exchange handshake if given wrong public key in payload', async () => { try { const duplex = duplexPair() - const connectionFrom = pbStream(duplex[0]) - const connectionTo = pbStream(duplex[1]) + const connectionFrom = lpStream(duplex[0]) + const connectionTo = lpStream(duplex[1]) const prologue = Buffer.alloc(0) const staticKeysInitiator = pureJsCrypto.generateX25519KeyPair() @@ -95,8 +95,8 @@ describe('XX Handshake', () => { it('Responder should fail to exchange handshake if given wrong public key in payload', async () => { try { const duplex = duplexPair() - const connectionFrom = pbStream(duplex[0]) - const connectionTo = pbStream(duplex[1]) + const connectionFrom = lpStream(duplex[0]) + const connectionTo = lpStream(duplex[1]) const prologue = Buffer.alloc(0) const staticKeysInitiator = pureJsCrypto.generateX25519KeyPair()