diff --git a/packages/interface-connection/src/index.ts b/packages/interface-connection/src/index.ts index 1f68d8a9a..799af8ed9 100644 --- a/packages/interface-connection/src/index.ts +++ b/packages/interface-connection/src/index.ts @@ -72,7 +72,7 @@ export interface StreamStat { * It may be encrypted and multiplexed depending on the * configuration of the nodes. */ -export interface Stream extends Duplex { +export interface Stream extends Duplex { /** * Close a stream for reading and writing */ @@ -120,23 +120,23 @@ export interface Stream exte * multiplexed, depending on the configuration of the nodes * between which the connection is made. */ -export interface Connection { +export interface Connection { id: string stat: ConnectionStat remoteAddr: Multiaddr remotePeer: PeerId tags: string[] - streams: Array> + streams: Stream[] newStream: (multicodecs: string | string[], options?: AbortOptions) => Promise - addStream: (stream: Stream) => void + addStream: (stream: Stream) => void removeStream: (id: string) => void close: () => Promise } export const symbol = Symbol.for('@libp2p/connection') -export function isConnection (other: any): other is Connection { +export function isConnection (other: any): other is Connection { return other != null && Boolean(other[symbol]) } diff --git a/packages/interface-metrics/src/index.ts b/packages/interface-metrics/src/index.ts index e6a60c1f6..e7248591c 100644 --- a/packages/interface-metrics/src/index.ts +++ b/packages/interface-metrics/src/index.ts @@ -46,11 +46,11 @@ export interface Stats { push: (counter: string, inc: number) => void } -export interface TrackStreamOptions > { +export interface TrackStreamOptions { /** * A duplex iterable stream */ - stream: T + stream: Duplex<{ byteLength: number }, any> /** * The id of the remote peer that's connected @@ -111,7 +111,7 @@ export interface StreamMetrics { * When the `PeerId` is known, `Metrics.updatePlaceholder` should be called * with the placeholder string returned from here, and the known `PeerId`. */ - trackStream: > (data: TrackStreamOptions) => T + trackStream: (data: TrackStreamOptions) => void } /** diff --git a/packages/interface-mocks/package.json b/packages/interface-mocks/package.json index 0cf530d2b..b433230af 100644 --- a/packages/interface-mocks/package.json +++ b/packages/interface-mocks/package.json @@ -151,7 +151,7 @@ "@libp2p/interface-transport": "^1.0.0", "@libp2p/interfaces": "^3.0.0", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^2.0.0", + "@libp2p/multistream-select": "^3.0.0", "@libp2p/peer-id": "^1.1.12", "@libp2p/peer-id-factory": "^1.0.12", "@multiformats/multiaddr": "^10.2.0", diff --git a/packages/interface-mocks/src/connection.ts b/packages/interface-mocks/src/connection.ts index dde9b1d92..9c1521a52 100644 --- a/packages/interface-mocks/src/connection.ts +++ b/packages/interface-mocks/src/connection.ts @@ -8,7 +8,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' import { mockMultiaddrConnection } from './multiaddr-connection.js' import type { Registrar } from '@libp2p/interface-registrar' import { mockRegistrar } from './registrar.js' -import { Dialer, Listener } from '@libp2p/multistream-select' +import * as mss from '@libp2p/multistream-select' import { logger } from '@libp2p/logger' import * as STATUS from '@libp2p/interface-connection/status' import type { Multiaddr } from '@multiformats/multiaddr' @@ -16,6 +16,7 @@ import type { StreamMuxer } from '@libp2p/interface-stream-muxer' import type { Components } from '@libp2p/components' import type { AbortOptions } from '@libp2p/interfaces' import errCode from 'err-code' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:mock-connection') @@ -79,8 +80,7 @@ class MockConnection implements Connection { const id = `${Math.random()}` const stream: Stream = this.muxer.newStream(id) - const mss = new Dialer(stream) - const result = await mss.select(protocols, options) + const result = await mss.select(stream, protocols, options) const streamWithProtocol: Stream = { ...stream, @@ -130,9 +130,8 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio const muxer = muxerFactory.createStreamMuxer({ direction: direction, onIncomingStream: (muxedStream) => { - const mss = new Listener(muxedStream) try { - mss.handle(registrar.getProtocols()) + mss.handle(muxedStream, registrar.getProtocols()) .then(({ stream, protocol }) => { log('%s: incoming stream opened on %s', direction, protocol) muxedStream = { ...muxedStream, ...stream } @@ -169,7 +168,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio return connection } -export function mockStream (stream: Duplex): Stream { +export function mockStream (stream: Duplex): Stream { return { ...stream, close: () => {}, diff --git a/packages/interface-mocks/src/muxer.ts b/packages/interface-mocks/src/muxer.ts index 923862abd..4ed358b55 100644 --- a/packages/interface-mocks/src/muxer.ts +++ b/packages/interface-mocks/src/muxer.ts @@ -46,7 +46,7 @@ type StreamMessage = DataMessage | ResetMessage | CloseMessage | CreateMessage class MuxedStream { public id: string - public input: Pushable + public input: Pushable public stream: Stream public type: 'initiator' | 'recipient' @@ -299,7 +299,7 @@ class MockMuxer implements StreamMuxer { try { await pipe( abortableSource(source, this.closeController.signal), - (source) => map(source, buf => uint8ArrayToString(buf)), + (source) => map(source, buf => uint8ArrayToString(buf.subarray())), ndjson.parse, async (source) => { for await (const message of source) { @@ -344,7 +344,7 @@ class MockMuxer implements StreamMuxer { } if (message.type === 'data') { - muxedStream.input.push(uint8ArrayFromString(message.chunk, 'base64')) + muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64'))) } else if (message.type === 'reset') { this.log('-> reset stream %s %s', muxedStream.type, muxedStream.stream.id) muxedStream.stream.reset() @@ -422,10 +422,10 @@ class MockMuxerFactory implements StreamMuxerFactory { void pipe( mockMuxer.streamInput, ndjson.stringify, - (source) => map(source, str => uint8ArrayFromString(str)), + (source) => map(source, str => new Uint8ArrayList(uint8ArrayFromString(str))), async (source) => { for await (const buf of source) { - mockMuxer.input.push(buf) + mockMuxer.input.push(buf.subarray()) } } ) diff --git a/packages/interface-stream-muxer-compliance-tests/package.json b/packages/interface-stream-muxer-compliance-tests/package.json index 140af1bc0..3a84752d4 100644 --- a/packages/interface-stream-muxer-compliance-tests/package.json +++ b/packages/interface-stream-muxer-compliance-tests/package.json @@ -146,6 +146,7 @@ "it-stream-types": "^1.0.4", "p-defer": "^4.0.0", "p-limit": "^4.0.0", + "uint8arraylist": "^2.1.2", "uint8arrays": "^3.0.0" } } diff --git a/packages/interface-stream-muxer-compliance-tests/src/base-test.ts b/packages/interface-stream-muxer-compliance-tests/src/base-test.ts index d39cc6454..b4b5e2733 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/base-test.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/base-test.ts @@ -13,8 +13,9 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests' import type { Stream } from '@libp2p/interface-connection' import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import type { Source, Duplex } from 'it-stream-types' +import { Uint8ArrayList } from 'uint8arraylist' -async function drainAndClose (stream: Duplex) { +async function drainAndClose (stream: Duplex) { return await pipe([], stream, drain) } @@ -144,7 +145,7 @@ export default (common: TestSetup) => { }) it('Open a stream on one side, write, open a stream on the other side', async () => { - const toString = (source: Source) => map(source, (u) => uint8ArrayToString(u)) + const toString = (source: Source) => map(source, (u) => uint8ArrayToString(u.subarray())) const p = duplexPair() const onDialerStreamPromise: DeferredPromise = defer() const onListenerStreamPromise: DeferredPromise = defer() @@ -169,8 +170,8 @@ export default (common: TestSetup) => { const dialerConn = dialer.newStream() const listenerConn = listener.newStream() - void pipe([uint8ArrayFromString('hey')], dialerConn) - void pipe([uint8ArrayFromString('hello')], listenerConn) + void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn) + void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn) const [ dialerStream, diff --git a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts index 9b6f5fa61..8536a216a 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/close-test.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/close-test.ts @@ -10,6 +10,7 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests' import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer' import pDefer from 'p-defer' import all from 'it-all' +import { Uint8ArrayList } from 'uint8arraylist' function randomBuffer () { return uint8ArrayFromString(Math.random().toString()) @@ -18,7 +19,7 @@ function randomBuffer () { const infiniteRandom = { [Symbol.asyncIterator]: async function * () { while (true) { - yield randomBuffer() + yield new Uint8ArrayList(randomBuffer()) await delay(50) } } @@ -220,7 +221,7 @@ export default (common: TestSetup) => { // Pause, and then send some data and close the first stream await delay(50) - await pipe([randomBuffer()], stream, drain) + await pipe([new Uint8ArrayList(randomBuffer())], stream, drain) closed = true // Abort all the other streams later @@ -232,7 +233,7 @@ export default (common: TestSetup) => { }) it('can close a stream for writing', async () => { - const deferred = pDefer() + const deferred = pDefer() const p = duplexPair() const dialerFactory = await common.setup() @@ -257,8 +258,8 @@ export default (common: TestSetup) => { expect(results).to.eql(data) try { - await stream.sink([randomBuffer()]) - } catch (err) { + await stream.sink([new Uint8ArrayList(randomBuffer())]) + } catch (err: any) { deferred.resolve(err) } @@ -283,7 +284,7 @@ export default (common: TestSetup) => { const p = duplexPair() const dialerFactory = await common.setup() const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' }) - const data = [randomBuffer(), randomBuffer()] + const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d)) const listenerFactory = await common.setup() const listener = listenerFactory.createStreamMuxer({ diff --git a/packages/interface-stream-muxer-compliance-tests/src/spawner.ts b/packages/interface-stream-muxer-compliance-tests/src/spawner.ts index 805ece2a7..987d27abc 100644 --- a/packages/interface-stream-muxer-compliance-tests/src/spawner.ts +++ b/packages/interface-stream-muxer-compliance-tests/src/spawner.ts @@ -6,11 +6,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import drain from 'it-drain' import all from 'it-all' import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer' +import { Uint8ArrayList } from 'uint8arraylist' export default async (createMuxer: (init?: StreamMuxerInit) => Promise, nStreams: number, nMsg: number, limit?: number) => { const [dialerSocket, listenerSocket] = duplexPair() - const msg = uint8ArrayFromString('simple msg') + const msg = new Uint8ArrayList(uint8ArrayFromString('simple msg')) const listener = await createMuxer({ direction: 'inbound',