Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add message byte batching #58

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"it-batched-bytes": "^2.0.3",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
Expand Down
18 changes: 17 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ export interface Config {
* This ensures that a single stream doesn't hog a connection.
*/
maxMessageSize: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: 0)
*/
minSendBytes: number
}

export const defaultConfig: Config = {
Expand All @@ -62,7 +74,8 @@ export const defaultConfig: Config = {
maxOutboundStreams: 1_000,
initialStreamWindowSize: INITIAL_STREAM_WINDOW,
maxStreamWindowSize: MAX_STREAM_WINDOW,
maxMessageSize: 64 * 1024
maxMessageSize: 64 * 1024,
minSendBytes: 0
}

export function verifyConfig (config: Config): void {
Expand All @@ -87,4 +100,7 @@ export function verifyConfig (config: Config): void {
if (config.maxMessageSize < 1024) {
throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG)
}
if (config.minSendBytes < 0) {
throw new CodeError('MinSendBytes must be greater or equal to 0', ERR_INVALID_CONFIG)
}
}
36 changes: 27 additions & 9 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import batchedBytes from 'it-batched-bytes'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import { pushableV, type PushableV } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
Expand All @@ -13,7 +14,7 @@ import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500
Expand Down Expand Up @@ -43,12 +44,13 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: AsyncGenerator<Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
private readonly log?: Logger

private readonly _source: PushableV<Uint8Array>
/** Used to close the muxer from either the sink or source */
private readonly closeController: AbortController

Expand Down Expand Up @@ -80,7 +82,7 @@ export class YamuxMuxer implements StreamMuxer {

constructor (init: YamuxMuxerInit) {
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
const config = this.config = { ...defaultConfig, ...init }
this.log = this.config.log
verifyConfig(this.config)

Expand All @@ -91,7 +93,7 @@ export class YamuxMuxer implements StreamMuxer {

this._streams = new Map()

this.source = pushable({
this._source = pushableV({
onEnd: (): void => {
this.log?.trace('muxer source ended')

Expand All @@ -100,6 +102,22 @@ export class YamuxMuxer implements StreamMuxer {
})
}
})
this.source = pipe(
this._source,
config.minSendBytes === 0
? async function * (source) {
for await (const bufs of source) {
yield new Uint8ArrayList(...bufs).subarray()
}
}
: async function * (source) {
yield * batchedBytes(source, {
size: config.minSendBytes,
serialize: (bufs, list) => { list.appendAll(bufs) }

})
}
)

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
source = abortableSource(
Expand Down Expand Up @@ -322,7 +340,7 @@ export class YamuxMuxer implements StreamMuxer {
this.closeController.abort()

// stop the source
this.source.end()
this._source.end()
}

/** Create a new stream */
Expand Down Expand Up @@ -538,10 +556,10 @@ export class YamuxMuxer implements StreamMuxer {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this._source.push(encodeHeader(header))
this._source.push(data)
} else {
this.source.push(encodeHeader(header))
this._source.push(encodeHeader(header))
}
}

Expand Down