Skip to content

Commit

Permalink
Allow multiple readers at once
Browse files Browse the repository at this point in the history
  • Loading branch information
ehmicky committed Mar 10, 2024
1 parent 9749f68 commit 9779d56
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 6 deletions.
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
"object",
"concat"
],
"dependencies": {
"is-stream": "^4.0.1"
},
"devDependencies": {
"@types/node": "^20.8.9",
"ava": "^5.3.1",
"onetime": "^7.0.0",
"precise-now": "^3.0.0",
"stream-json": "^1.8.0",
"tsd": "^0.29.0",
Expand Down
10 changes: 4 additions & 6 deletions source/contents.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import {getAsyncIterable} from './stream.js';

export const getStreamContents = async (stream, {init, convertChunk, getSize, truncateChunk, addChunk, getFinalChunk, finalize}, {maxBuffer = Number.POSITIVE_INFINITY} = {}) => {
if (!isAsyncIterable(stream)) {
throw new Error('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}
const asyncIterable = getAsyncIterable(stream);

const state = init();
state.length = 0;

try {
for await (const chunk of stream) {
for await (const chunk of asyncIterable) {
const chunkType = getChunkType(chunk);
const convertedChunk = convertChunk[chunkType](chunk, state);
appendChunk({convertedChunk, state, getSize, truncateChunk, addChunk, maxBuffer});
Expand Down Expand Up @@ -52,8 +52,6 @@ const addNewChunk = (convertedChunk, state, addChunk, newLength) => {
state.length = newLength;
};

const isAsyncIterable = stream => typeof stream === 'object' && stream !== null && typeof stream[Symbol.asyncIterator] === 'function';

const getChunkType = chunk => {
const typeOfChunk = typeof chunk;

Expand Down
66 changes: 66 additions & 0 deletions source/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import {isReadableStream} from 'is-stream';

export const getAsyncIterable = stream => {
if (isReadableStream(stream, {checkOpen: false})) {
return getStreamIterable(stream);
}

if (typeof stream?.[Symbol.asyncIterator] !== 'function') {
throw new TypeError('The first argument must be a Readable, a ReadableStream, or an async iterable.');
}

return stream;
};

// The default iterable for Node.js streams does not allow for multiple readers at once, so we re-implement it
const getStreamIterable = async function * (stream) {
if (nodeImports === undefined) {
await loadNodeImports();
}

const controller = new AbortController();
handleStreamEnd(stream, controller);

try {
for await (const [chunk] of nodeImports.events.on(stream, 'data', {
signal: controller.signal,
highWatermark: stream.readableHighWaterMark,
})) {
yield chunk;
}
} catch (error) {
// `error` event directly emitted on stream
if (!controller.signal.aborted || error.cause === undefined) {
throw error;
// Stream failure via `controller.abort(error)` below, for example due to `stream.destroy(error)`
} else if (error.cause !== endError) {
throw error.cause;
}
// Otherwise, this returns successfully via `controller.abort(endError)` below, for example due to `stream` having completed.
// The `finally` block also runs when the caller throws, for example due to the `maxBuffer` option.
} finally {
stream.destroy();
}
};

const handleStreamEnd = async (stream, controller) => {
try {
await nodeImports.streamPromises.finished(stream, {cleanup: true, readable: true, writable: false, error: false});
controller.abort(endError);
} catch (error) {
controller.abort(error);
}
};

const endError = new Error('Internal error');

// Use dynamic imports to support browsers
const loadNodeImports = async () => {
const [events, streamPromises] = await Promise.all([
import('node:events'),
import('node:stream/promises'),
]);
nodeImports = {events, streamPromises};
};

let nodeImports;
2 changes: 2 additions & 0 deletions test/fixtures/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ export const fixtureMultibyteString = '\u1000';
export const longMultibyteString = `${fixtureMultibyteString}\u1000`;

export const bigArray = Array.from({length: 1e5}, () => Math.floor(Math.random() * (2 ** 8)));

export const prematureClose = {code: 'ERR_STREAM_PREMATURE_CLOSE'};
Loading

0 comments on commit 9779d56

Please sign in to comment.