Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

feat!: change stream muxer interface #279

Merged
merged 7 commits into from
Aug 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions packages/interface-connection/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export interface StreamStat {
* It may be encrypted and multiplexed depending on the
* configuration of the nodes.
*/
export interface Stream<T extends Uint8Array | Uint8ArrayList = Uint8Array> extends Duplex<T> {
export interface Stream extends Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> {
/**
* Close a stream for reading and writing
*/
Expand Down Expand Up @@ -120,23 +120,23 @@ export interface Stream<T extends Uint8Array | Uint8ArrayList = Uint8Array> exte
* multiplexed, depending on the configuration of the nodes
* between which the connection is made.
*/
export interface Connection<T extends Uint8Array | Uint8ArrayList = Uint8Array> {
export interface Connection {
id: string
stat: ConnectionStat
remoteAddr: Multiaddr
remotePeer: PeerId
tags: string[]
streams: Array<Stream<T>>
streams: Stream[]

newStream: (multicodecs: string | string[], options?: AbortOptions) => Promise<Stream>
addStream: (stream: Stream<T>) => void
addStream: (stream: Stream) => void
removeStream: (id: string) => void
close: () => Promise<void>
}

export const symbol = Symbol.for('@libp2p/connection')

export function isConnection (other: any): other is Connection<Uint8Array | Uint8ArrayList> {
export function isConnection (other: any): other is Connection {
return other != null && Boolean(other[symbol])
}

Expand Down
6 changes: 3 additions & 3 deletions packages/interface-metrics/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ export interface Stats {
push: (counter: string, inc: number) => void
}

export interface TrackStreamOptions <T extends Duplex<Uint8Array>> {
export interface TrackStreamOptions {
/**
* A duplex iterable stream
*/
stream: T
stream: Duplex<{ byteLength: number }, any>

/**
* The id of the remote peer that's connected
Expand Down Expand Up @@ -111,7 +111,7 @@ export interface StreamMetrics {
* When the `PeerId` is known, `Metrics.updatePlaceholder` should be called
* with the placeholder string returned from here, and the known `PeerId`.
*/
trackStream: <T extends Duplex<Uint8Array>> (data: TrackStreamOptions<T>) => T
trackStream: (data: TrackStreamOptions) => void
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-mocks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"@libp2p/interface-transport": "^1.0.0",
"@libp2p/interfaces": "^3.0.0",
"@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^2.0.0",
"@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-id": "^1.1.12",
"@libp2p/peer-id-factory": "^1.0.12",
"@multiformats/multiaddr": "^10.2.0",
Expand Down
11 changes: 5 additions & 6 deletions packages/interface-mocks/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import type { Registrar } from '@libp2p/interface-registrar'
import { mockRegistrar } from './registrar.js'
import { Dialer, Listener } from '@libp2p/multistream-select'
import * as mss from '@libp2p/multistream-select'
import { logger } from '@libp2p/logger'
import * as STATUS from '@libp2p/interface-connection/status'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { StreamMuxer } from '@libp2p/interface-stream-muxer'
import type { Components } from '@libp2p/components'
import type { AbortOptions } from '@libp2p/interfaces'
import errCode from 'err-code'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:mock-connection')

Expand Down Expand Up @@ -79,8 +80,7 @@ class MockConnection implements Connection {

const id = `${Math.random()}`
const stream: Stream = this.muxer.newStream(id)
const mss = new Dialer(stream)
const result = await mss.select(protocols, options)
const result = await mss.select(stream, protocols, options)

const streamWithProtocol: Stream = {
...stream,
Expand Down Expand Up @@ -130,9 +130,8 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
const muxer = muxerFactory.createStreamMuxer({
direction: direction,
onIncomingStream: (muxedStream) => {
const mss = new Listener(muxedStream)
try {
mss.handle(registrar.getProtocols())
mss.handle(muxedStream, registrar.getProtocols())
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream = { ...muxedStream, ...stream }
Expand Down Expand Up @@ -169,7 +168,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
return connection
}

export function mockStream (stream: Duplex<Uint8Array>): Stream {
export function mockStream (stream: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>): Stream {
return {
...stream,
close: () => {},
Expand Down
10 changes: 5 additions & 5 deletions packages/interface-mocks/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type StreamMessage = DataMessage | ResetMessage | CloseMessage | CreateMessage

class MuxedStream {
public id: string
public input: Pushable<Uint8Array>
public input: Pushable<Uint8ArrayList>
public stream: Stream
public type: 'initiator' | 'recipient'

Expand Down Expand Up @@ -299,7 +299,7 @@ class MockMuxer implements StreamMuxer {
try {
await pipe(
abortableSource(source, this.closeController.signal),
(source) => map(source, buf => uint8ArrayToString(buf)),
(source) => map(source, buf => uint8ArrayToString(buf.subarray())),
ndjson.parse,
async (source) => {
for await (const message of source) {
Expand Down Expand Up @@ -344,7 +344,7 @@ class MockMuxer implements StreamMuxer {
}

if (message.type === 'data') {
muxedStream.input.push(uint8ArrayFromString(message.chunk, 'base64'))
muxedStream.input.push(new Uint8ArrayList(uint8ArrayFromString(message.chunk, 'base64')))
} else if (message.type === 'reset') {
this.log('-> reset stream %s %s', muxedStream.type, muxedStream.stream.id)
muxedStream.stream.reset()
Expand Down Expand Up @@ -422,10 +422,10 @@ class MockMuxerFactory implements StreamMuxerFactory {
void pipe(
mockMuxer.streamInput,
ndjson.stringify,
(source) => map(source, str => uint8ArrayFromString(str)),
(source) => map(source, str => new Uint8ArrayList(uint8ArrayFromString(str))),
async (source) => {
for await (const buf of source) {
mockMuxer.input.push(buf)
mockMuxer.input.push(buf.subarray())
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
"it-stream-types": "^1.0.4",
"p-defer": "^4.0.0",
"p-limit": "^4.0.0",
"uint8arraylist": "^2.1.2",
"uint8arrays": "^3.0.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests'
import type { Stream } from '@libp2p/interface-connection'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import type { Source, Duplex } from 'it-stream-types'
import { Uint8ArrayList } from 'uint8arraylist'

async function drainAndClose (stream: Duplex<Uint8Array>) {
async function drainAndClose (stream: Duplex<any>) {
return await pipe([], stream, drain)
}

Expand Down Expand Up @@ -144,7 +145,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
})

it('Open a stream on one side, write, open a stream on the other side', async () => {
const toString = (source: Source<Uint8Array>) => map(source, (u) => uint8ArrayToString(u))
const toString = (source: Source<Uint8ArrayList>) => map(source, (u) => uint8ArrayToString(u.subarray()))
const p = duplexPair<Uint8Array>()
const onDialerStreamPromise: DeferredPromise<Stream> = defer()
const onListenerStreamPromise: DeferredPromise<Stream> = defer()
Expand All @@ -169,8 +170,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
const dialerConn = dialer.newStream()
const listenerConn = listener.newStream()

void pipe([uint8ArrayFromString('hey')], dialerConn)
void pipe([uint8ArrayFromString('hello')], listenerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hey'))], dialerConn)
void pipe([new Uint8ArrayList(uint8ArrayFromString('hello'))], listenerConn)

const [
dialerStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { TestSetup } from '@libp2p/interface-compliance-tests'
import type { StreamMuxerFactory } from '@libp2p/interface-stream-muxer'
import pDefer from 'p-defer'
import all from 'it-all'
import { Uint8ArrayList } from 'uint8arraylist'

function randomBuffer () {
return uint8ArrayFromString(Math.random().toString())
Expand All @@ -18,7 +19,7 @@ function randomBuffer () {
const infiniteRandom = {
[Symbol.asyncIterator]: async function * () {
while (true) {
yield randomBuffer()
yield new Uint8ArrayList(randomBuffer())
await delay(50)
}
}
Expand Down Expand Up @@ -220,7 +221,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {

// Pause, and then send some data and close the first stream
await delay(50)
await pipe([randomBuffer()], stream, drain)
await pipe([new Uint8ArrayList(randomBuffer())], stream, drain)
closed = true

// Abort all the other streams later
Expand All @@ -232,7 +233,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
})

it('can close a stream for writing', async () => {
const deferred = pDefer<any>()
const deferred = pDefer<Error>()

const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
Expand All @@ -257,8 +258,8 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
expect(results).to.eql(data)

try {
await stream.sink([randomBuffer()])
} catch (err) {
await stream.sink([new Uint8ArrayList(randomBuffer())])
} catch (err: any) {
deferred.resolve(err)
}

Expand All @@ -283,7 +284,7 @@ export default (common: TestSetup<StreamMuxerFactory>) => {
const p = duplexPair<Uint8Array>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const data = [randomBuffer(), randomBuffer()]
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import drain from 'it-drain'
import all from 'it-all'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface-stream-muxer'
import { Uint8ArrayList } from 'uint8arraylist'

export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMuxer>, nStreams: number, nMsg: number, limit?: number) => {
const [dialerSocket, listenerSocket] = duplexPair<Uint8Array>()

const msg = uint8ArrayFromString('simple msg')
const msg = new Uint8ArrayList(uint8ArrayFromString('simple msg'))

const listener = await createMuxer({
direction: 'inbound',
Expand Down