diff --git a/doc/api/stream.md b/doc/api/stream.md index 935e4004b38bee..e6e42399b9bef5 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1835,6 +1835,46 @@ await dnsResults.forEach((result) => { console.log('done'); // Stream has finished ``` +### `readable.toArray([options])` + + + +> Stability: 1 - Experimental + +* `options` {Object} + * `signal` {AbortSignal} allows cancelling the toArray operation if the + signal is aborted. +* Returns: {Promise} a promise containing an array (if the stream is in + object mode) or Buffer with the contents of the stream. + +This method allows easily obtaining the contents of a stream. If the +stream is in [object mode][object-mode] an array of its contents is returned. +If the stream is not in object mode a Buffer containing its data is returned. + +As this method reads the entire stream into memory, it negates the benefits of +streams. It's intended for interoperability and convenience, not as the primary +way to consume streams. + +```mjs +import { Readable } from 'stream'; +import { Resolver } from 'dns/promises'; + +await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4] + +// Make dns queries concurrently using .map and collect +// the results into an aray using toArray +const dnsResults = await Readable.from([ + 'nodejs.org', + 'openjsf.org', + 'www.linuxfoundation.org', +]).map(async (domain) => { + const { address } = await resolver.resolve4(domain, { ttl: true }); + return address; +}, { concurrency: 2 }).toArray(); +``` + ### Duplex and transform streams #### Class: `stream.Duplex` diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index c9581f7b6dfe6c..2649966fd403ac 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -1,6 +1,8 @@ 'use strict'; const { AbortController } = require('internal/abort_controller'); +const { Buffer } = require('buffer'); + const { codes: { ERR_INVALID_ARG_TYPE, @@ -10,6 +12,7 @@ const { const { validateInteger } = require('internal/validators'); const { + ArrayPrototypePush, MathFloor, Promise, PromiseReject, @@ -174,6 +177,19 @@ async function * filter(fn, options) { yield* this.map(filterFn, options); } +async function toArray(options) { + const result = []; + for await (const val of this) { + if (options?.signal?.aborted) { + throw new AbortError({ cause: options.signal.reason }); + } + ArrayPrototypePush(result, val); + } + if (!this.readableObjectMode) { + return Buffer.concat(result); + } + return result; +} module.exports.streamReturningOperators = { filter, map, @@ -181,4 +197,5 @@ module.exports.streamReturningOperators = { module.exports.promiseReturningOperators = { forEach, + toArray, }; diff --git a/test/parallel/test-stream-toArray.js b/test/parallel/test-stream-toArray.js new file mode 100644 index 00000000000000..3bd15e7c0fbf34 --- /dev/null +++ b/test/parallel/test-stream-toArray.js @@ -0,0 +1,79 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); + +{ + // Works on a synchronous stream + (async () => { + const tests = [ + [], + [1], + [1, 2, 3], + Array(100).fill().map((_, i) => i), + ]; + for (const test of tests) { + const stream = Readable.from(test); + const result = await stream.toArray(); + assert.deepStrictEqual(result, test); + } + })().then(common.mustCall()); +} + +{ + // Works on a non-object-mode stream and flattens it + (async () => { + const stream = Readable.from( + [Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])] + , { objectMode: false }); + const result = await stream.toArray(); + assert.strictEqual(Buffer.isBuffer(result), true); + assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]); + })().then(common.mustCall()); +} + +{ + // Works on an asynchronous stream + (async () => { + const tests = [ + [], + [1], + [1, 2, 3], + Array(100).fill().map((_, i) => i), + ]; + for (const test of tests) { + const stream = Readable.from(test).map((x) => Promise.resolve(x)); + const result = await stream.toArray(); + assert.deepStrictEqual(result, test); + } + })().then(common.mustCall()); +} + +{ + // Support for AbortSignal + const ac = new AbortController(); + let stream; + assert.rejects(async () => { + stream = Readable.from([1, 2, 3]).map(async (x) => { + if (x === 3) { + await new Promise(() => {}); // Explicitly do not pass signal here + } + return Promise.resolve(x); + }); + await stream.toArray({ signal: ac.signal }); + }, { + name: 'AbortError', + }).then(common.mustCall(() => { + // Only stops toArray, does not destory the stream + assert(stream.destroyed, false); + })); + ac.abort(); +} +{ + // Test result is a Promise + const result = Readable.from([1, 2, 3, 4, 5]).toArray(); + assert.strictEqual(result instanceof Promise, true); +}