From 2bcf3bce4ec4fd38ed6e813f11f43bd5ebbf950f Mon Sep 17 00:00:00 2001 From: Rich Trott Date: Wed, 24 Feb 2021 11:30:50 -0800 Subject: [PATCH 1/3] stream: move duplicated code to an internal module Create a utils module for isIterable(), isReadable(), and isStream(). PR-URL: https://github.com/nodejs/node/pull/37508 Reviewed-By: Antoine du Hamel Reviewed-By: Benjamin Gruenbaum Reviewed-By: Darshan Sen Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina --- lib/internal/streams/pipeline.js | 27 +++++---------------- lib/internal/streams/utils.js | 32 +++++++++++++++++++++++++ node.gyp | 1 + test/parallel/test-bootstrap-modules.js | 1 + 4 files changed, 40 insertions(+), 21 deletions(-) create mode 100644 lib/internal/streams/utils.js diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 134e9ea94fa797..6483e9829eddc3 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -7,7 +7,6 @@ const { ArrayIsArray, ReflectApply, SymbolAsyncIterator, - SymbolIterator, } = primordials; let eos; @@ -22,6 +21,12 @@ const { ERR_STREAM_DESTROYED } = require('internal/errors').codes; +const { + isIterable, + isReadable, + isStream, +} = require('internal/streams/utils'); + let EE; let PassThrough; let Readable; @@ -78,26 +83,6 @@ function popCallback(streams) { return streams.pop(); } -function isReadable(obj) { - return !!(obj && typeof obj.pipe === 'function'); -} - -function isWritable(obj) { - return !!(obj && typeof obj.write === 'function'); -} - -function isStream(obj) { - return isReadable(obj) || isWritable(obj); -} - -function isIterable(obj, isAsync) { - if (!obj) return false; - if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; - if (isAsync === false) return typeof obj[SymbolIterator] === 'function'; - return typeof obj[SymbolAsyncIterator] === 'function' || - typeof obj[SymbolIterator] === 'function'; -} - function makeAsyncIterable(val) { if (isIterable(val)) { return val; diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js new file mode 100644 index 00000000000000..08c196802780b8 --- /dev/null +++ b/lib/internal/streams/utils.js @@ -0,0 +1,32 @@ +'use strict'; + +const { + SymbolAsyncIterator, + SymbolIterator, +} = primordials; + +function isReadable(obj) { + return !!(obj && typeof obj.pipe === 'function'); +} + +function isWritable(obj) { + return !!(obj && typeof obj.write === 'function'); +} + +function isStream(obj) { + return isReadable(obj) || isWritable(obj); +} + +function isIterable(obj, isAsync) { + if (!obj) return false; + if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; + if (isAsync === false) return typeof obj[SymbolIterator] === 'function'; + return typeof obj[SymbolAsyncIterator] === 'function' || + typeof obj[SymbolIterator] === 'function'; +} + +module.exports = { + isIterable, + isReadable, + isStream, +}; diff --git a/node.gyp b/node.gyp index 2197f0fd755a15..35b7912afa786c 100644 --- a/node.gyp +++ b/node.gyp @@ -245,6 +245,7 @@ 'lib/internal/streams/state.js', 'lib/internal/streams/pipeline.js', 'lib/internal/streams/end-of-stream.js', + 'lib/internal/streams/utils.js', 'deps/v8/tools/splaytree.js', 'deps/v8/tools/codemap.js', 'deps/v8/tools/consarray.js', diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js index 0544f8eec4b54e..683171fe3c7b20 100644 --- a/test/parallel/test-bootstrap-modules.js +++ b/test/parallel/test-bootstrap-modules.js @@ -79,6 +79,7 @@ const expectedModules = new Set([ 'NativeModule internal/process/warning', 'NativeModule internal/querystring', 'NativeModule internal/source_map/source_map_cache', + 'NativeModule internal/streams/utils', 'NativeModule internal/timers', 'NativeModule internal/url', 'NativeModule internal/util', From c4f669828e78428e4520f3ac8b5cc22040696858 Mon Sep 17 00:00:00 2001 From: HiroyukiYagihashi Date: Sun, 21 Feb 2021 22:14:21 +0900 Subject: [PATCH 2/3] fs: add support for async iterators to `fsPromises.writeFile` Fixes: https://github.com/nodejs/node/issues/37391 PR-URL: https://github.com/nodejs/node/pull/37490 Reviewed-By: Antoine du Hamel Reviewed-By: James M Snell --- doc/api/fs.md | 6 +- lib/internal/fs/promises.js | 36 ++++-- test/parallel/test-fs-append-file.js | 2 +- test/parallel/test-fs-promises-writefile.js | 116 +++++++++++++++++++- 4 files changed, 144 insertions(+), 16 deletions(-) diff --git a/doc/api/fs.md b/doc/api/fs.md index 2143679900651f..f4ff98b1140328 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -1234,6 +1234,9 @@ All the [caveats][] for `fs.watch()` also apply to `fsPromises.watch()`. * `file` {string|Buffer|URL|FileHandle} filename or `FileHandle` -* `data` {string|Buffer|Uint8Array|Object} +* `data` {string|Buffer|Uint8Array|Object|AsyncIterable|Iterable + |Stream} * `options` {Object|string} * `encoding` {string|null} **Default:** `'utf8'` * `mode` {integer} **Default:** `0o666` diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 767195cc2ec738..0919666d93fb38 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -34,7 +34,7 @@ const { const binding = internalBinding('fs'); const { Buffer } = require('buffer'); -const { codes, hideStackFrames } = require('internal/errors'); +const { AbortError, codes, hideStackFrames } = require('internal/errors'); const { ERR_FS_FILE_TOO_LARGE, ERR_INVALID_ARG_TYPE, @@ -73,6 +73,7 @@ const { const pathModule = require('path'); const { promisify } = require('internal/util'); const { watch } = require('internal/fs/watchers'); +const { isIterable } = require('internal/streams/utils'); const kHandle = Symbol('kHandle'); const kFd = Symbol('kFd'); @@ -254,8 +255,23 @@ async function fsCall(fn, handle, ...args) { } } -async function writeFileHandle(filehandle, data, signal) { - // `data` could be any kind of typed array. +function checkAborted(signal) { + if (signal && signal.aborted) + throw new AbortError(); +} + +async function writeFileHandle(filehandle, data, signal, encoding) { + checkAborted(signal); + if (isCustomIterable(data)) { + for await (const buf of data) { + checkAborted(signal); + await write( + filehandle, buf, undefined, + isArrayBufferView(buf) ? buf.length : encoding); + checkAborted(signal); + } + return; + } data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); let remaining = data.length; if (remaining === 0) return; @@ -403,7 +419,7 @@ async function readv(handle, buffers, position) { } async function write(handle, buffer, offset, length, position) { - if (buffer.length === 0) + if (buffer && buffer.length === 0) return { bytesWritten: 0, buffer }; if (isArrayBufferView(buffer)) { @@ -645,22 +661,26 @@ async function writeFile(path, data, options) { options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'w' }); const flag = options.flag || 'w'; - if (!isArrayBufferView(data)) { + if (!isArrayBufferView(data) && !isCustomIterable(data)) { validateStringAfterArrayBufferView(data, 'data'); data = Buffer.from(data, options.encoding || 'utf8'); } validateAbortSignal(options.signal); if (path instanceof FileHandle) - return writeFileHandle(path, data, options.signal); + return writeFileHandle(path, data, options.signal, options.encoding); if (options.signal?.aborted) { throw lazyDOMException('The operation was aborted', 'AbortError'); } const fd = await open(path, flag, options.mode); - const { signal } = options; - return PromisePrototypeFinally(writeFileHandle(fd, data, signal), fd.close); + return PromisePrototypeFinally( + writeFileHandle(fd, data, options.signal, options.encoding), fd.close); +} + +function isCustomIterable(obj) { + return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string'; } async function appendFile(path, data, options) { diff --git a/test/parallel/test-fs-append-file.js b/test/parallel/test-fs-append-file.js index a191f8b20693c1..70919830f6cb0b 100644 --- a/test/parallel/test-fs-append-file.js +++ b/test/parallel/test-fs-append-file.js @@ -121,7 +121,7 @@ const throwNextTick = (e) => { process.nextTick(() => { throw e; }); }; } // Test that appendFile does not accept invalid data type (callback API). -[false, 5, {}, [], null, undefined].forEach(async (data) => { +[false, 5, {}, null, undefined].forEach(async (data) => { const errObj = { code: 'ERR_INVALID_ARG_TYPE', message: /"data"|"buffer"/ diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index f5f3e8dad38469..3caf886ef2662c 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -8,6 +8,7 @@ const path = require('path'); const tmpdir = require('../common/tmpdir'); const assert = require('assert'); const tmpDir = tmpdir.path; +const { Readable } = require('stream'); tmpdir.refresh(); @@ -15,6 +16,39 @@ const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); const buffer = Buffer.from('abc'.repeat(1000)); const buffer2 = Buffer.from('xyz'.repeat(1000)); +const stream = Readable.from(['a', 'b', 'c']); +const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']); +const iterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; +function iterableWith(value) { + return { + *[Symbol.iterator]() { + yield value; + } + }; +} +const bufferIterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield Buffer.from('a'); + yield Buffer.from('b'); + yield Buffer.from('c'); + } +}; +const asyncIterable = { + expected: 'abc', + async* [Symbol.asyncIterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; async function doWrite() { await fsPromises.writeFile(dest, buffer); @@ -22,6 +56,67 @@ async function doWrite() { assert.deepStrictEqual(data, buffer); } +async function doWriteStream() { + await fsPromises.writeFile(dest, stream); + const expected = 'abc'; + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, expected); +} + +async function doWriteStreamWithCancel() { + const controller = new AbortController(); + const { signal } = controller; + process.nextTick(() => controller.abort()); + assert.rejects(fsPromises.writeFile(otherDest, stream, { signal }), { + name: 'AbortError' + }); +} + +async function doWriteIterable() { + await fsPromises.writeFile(dest, iterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, iterable.expected); +} + +async function doWriteInvalidIterable() { + await Promise.all( + [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => + assert.rejects(fsPromises.writeFile(dest, iterableWith(value)), { + code: 'ERR_INVALID_ARG_TYPE', + }) + ) + ); +} + +async function doWriteIterableWithEncoding() { + await fsPromises.writeFile(dest, stream2, 'latin1'); + const expected = 'ümlaut sechzig'; + const data = fs.readFileSync(dest, 'latin1'); + assert.deepStrictEqual(data, expected); +} + +async function doWriteBufferIterable() { + await fsPromises.writeFile(dest, bufferIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, bufferIterable.expected); +} + +async function doWriteAsyncIterable() { + await fsPromises.writeFile(dest, asyncIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, asyncIterable.expected); +} + +async function doWriteInvalidValues() { + await Promise.all( + [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => + assert.rejects(fsPromises.writeFile(dest, value), { + code: 'ERR_INVALID_ARG_TYPE', + }) + ) + ); +} + async function doWriteWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -51,9 +146,18 @@ async function doReadWithEncoding() { assert.deepStrictEqual(data, syncData); } -doWrite() - .then(doWriteWithCancel) - .then(doAppend) - .then(doRead) - .then(doReadWithEncoding) - .then(common.mustCall()); +(async () => { + await doWrite(); + await doWriteWithCancel(); + await doAppend(); + await doRead(); + await doReadWithEncoding(); + await doWriteStream(); + await doWriteStreamWithCancel(); + await doWriteIterable(); + await doWriteInvalidIterable(); + await doWriteIterableWithEncoding(); + await doWriteBufferIterable(); + await doWriteAsyncIterable(); + await doWriteInvalidValues(); +})().then(common.mustCall()); From 17b6ad6f854e9763dd6b59b67fd376249935c0fd Mon Sep 17 00:00:00 2001 From: Antoine du Hamel Date: Sat, 21 Aug 2021 17:13:42 +0200 Subject: [PATCH 3/3] fs: add docs and tests for `AsyncIterable` support in `fh.writeFile` Refs: https://github.com/nodejs/node/pull/37490 PR-URL: https://github.com/nodejs/node/pull/39836 Reviewed-By: Nitzan Uziely --- doc/api/fs.md | 11 +- .../test-fs-promises-file-handle-writeFile.js | 170 +++++++++++++++++- test/parallel/test-fs-promises-writefile.js | 14 +- 3 files changed, 177 insertions(+), 18 deletions(-) diff --git a/doc/api/fs.md b/doc/api/fs.md index f4ff98b1140328..e5c04d97541aa2 100644 --- a/doc/api/fs.md +++ b/doc/api/fs.md @@ -495,6 +495,9 @@ the end of the file. -* `data` {string|Buffer|Uint8Array|Object} +* `data` {string|Buffer|Uint8Array|Object|AsyncIterable|Iterable + |Stream} * `options` {Object|string} * `encoding` {string|null} The expected character encoding when `data` is a string. **Default:** `'utf8'` * Returns: {Promise} Asynchronously writes data to a file, replacing the file if it already exists. -`data` can be a string, a buffer, or an object with an own `toString` function +`data` can be a string, a buffer, an {AsyncIterable} or {Iterable} object, or an +object with an own `toString` function property. The promise is resolved with no arguments upon success. If `options` is a string, then it specifies the `encoding`. @@ -1236,7 +1241,7 @@ added: v10.0.0 changes: - version: REPLACEME pr-url: https://github.com/nodejs/node/pull/37490 - description: The `data` argument supports `AsyncIterable`, `Iterable` & `Stream`. + description: The `data` argument supports `AsyncIterable`, `Iterable` and `Stream`. - version: v14.17.0 pr-url: https://github.com/nodejs/node/pull/35993 description: The options argument may include an AbortSignal to abort an diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index a4ae7fd054b53d..46c9019bc8d8dd 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -9,6 +9,7 @@ const common = require('../common'); const fs = require('fs'); const { open, writeFile } = fs.promises; const path = require('path'); +const { Readable } = require('stream'); const tmpdir = require('../common/tmpdir'); const assert = require('assert'); const tmpDir = tmpdir.path; @@ -18,13 +19,15 @@ tmpdir.refresh(); async function validateWriteFile() { const filePathForHandle = path.resolve(tmpDir, 'tmp-write-file2.txt'); const fileHandle = await open(filePathForHandle, 'w+'); - const buffer = Buffer.from('Hello world'.repeat(100), 'utf8'); + try { + const buffer = Buffer.from('Hello world'.repeat(100), 'utf8'); - await fileHandle.writeFile(buffer); - const readFileData = fs.readFileSync(filePathForHandle); - assert.deepStrictEqual(buffer, readFileData); - - await fileHandle.close(); + await fileHandle.writeFile(buffer); + const readFileData = fs.readFileSync(filePathForHandle); + assert.deepStrictEqual(buffer, readFileData); + } finally { + await fileHandle.close(); + } } // Signal aborted while writing file @@ -40,6 +43,155 @@ async function doWriteAndCancel() { }); } -validateWriteFile() - .then(doWriteAndCancel) - .then(common.mustCall()); +const dest = path.resolve(tmpDir, 'tmp.txt'); +const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const stream = Readable.from(['a', 'b', 'c']); +const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']); +const iterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; +function iterableWith(value) { + return { + *[Symbol.iterator]() { + yield value; + } + }; +} +const bufferIterable = { + expected: 'abc', + *[Symbol.iterator]() { + yield Buffer.from('a'); + yield Buffer.from('b'); + yield Buffer.from('c'); + } +}; +const asyncIterable = { + expected: 'abc', + async* [Symbol.asyncIterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; + +async function doWriteStream() { + const fileHandle = await open(dest, 'w+'); + try { + await fileHandle.writeFile(stream); + const expected = 'abc'; + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, expected); + } finally { + await fileHandle.close(); + } +} + +async function doWriteStreamWithCancel() { + const controller = new AbortController(); + const { signal } = controller; + process.nextTick(() => controller.abort()); + const fileHandle = await open(otherDest, 'w+'); + try { + await assert.rejects( + fileHandle.writeFile(stream, { signal }), + { name: 'AbortError' } + ); + } finally { + await fileHandle.close(); + } +} + +async function doWriteIterable() { + const fileHandle = await open(dest, 'w+'); + try { + await fileHandle.writeFile(iterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, iterable.expected); + } finally { + await fileHandle.close(); + } +} + +async function doWriteInvalidIterable() { + const fileHandle = await open(dest, 'w+'); + try { + await Promise.all( + [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => + assert.rejects( + fileHandle.writeFile(iterableWith(value)), + { code: 'ERR_INVALID_ARG_TYPE' } + ) + ) + ); + } finally { + await fileHandle.close(); + } +} + +async function doWriteIterableWithEncoding() { + const fileHandle = await open(dest, 'w+'); + try { + await fileHandle.writeFile(stream2, 'latin1'); + const expected = 'ümlaut sechzig'; + const data = fs.readFileSync(dest, 'latin1'); + assert.deepStrictEqual(data, expected); + } finally { + await fileHandle.close(); + } +} + +async function doWriteBufferIterable() { + const fileHandle = await open(dest, 'w+'); + try { + await fileHandle.writeFile(bufferIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, bufferIterable.expected); + } finally { + await fileHandle.close(); + } +} + +async function doWriteAsyncIterable() { + const fileHandle = await open(dest, 'w+'); + try { + await fileHandle.writeFile(asyncIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, asyncIterable.expected); + } finally { + await fileHandle.close(); + } +} + +async function doWriteInvalidValues() { + const fileHandle = await open(dest, 'w+'); + try { + await Promise.all( + [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => + assert.rejects( + fileHandle.writeFile(value), + { code: 'ERR_INVALID_ARG_TYPE' } + ) + ) + ); + } finally { + await fileHandle.close(); + } +} + +(async () => { + await validateWriteFile(); + await doWriteAndCancel(); + await doWriteStream(); + await doWriteStreamWithCancel(); + await doWriteIterable(); + await doWriteInvalidIterable(); + await doWriteIterableWithEncoding(); + await doWriteBufferIterable(); + await doWriteAsyncIterable(); + await doWriteInvalidValues(); +})().then(common.mustCall()); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index 3caf886ef2662c..2be52ccbaa875f 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -67,9 +67,10 @@ async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; process.nextTick(() => controller.abort()); - assert.rejects(fsPromises.writeFile(otherDest, stream, { signal }), { - name: 'AbortError' - }); + await assert.rejects( + fsPromises.writeFile(otherDest, stream, { signal }), + { name: 'AbortError' } + ); } async function doWriteIterable() { @@ -121,9 +122,10 @@ async function doWriteWithCancel() { const controller = new AbortController(); const { signal } = controller; process.nextTick(() => controller.abort()); - assert.rejects(fsPromises.writeFile(otherDest, buffer, { signal }), { - name: 'AbortError' - }); + await assert.rejects( + fsPromises.writeFile(otherDest, buffer, { signal }), + { name: 'AbortError' } + ); } async function doAppend() {