Skip to content

Commit

Permalink
fs: add support for async iterators to fs.writeFile
Browse files Browse the repository at this point in the history
  • Loading branch information
HiroyukiYagihashi committed May 8, 2021
1 parent 52e4fb5 commit 785d76d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 21 deletions.
6 changes: 5 additions & 1 deletion doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3879,6 +3879,9 @@ details.
<!-- YAML
added: v0.1.29
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/38525
description: add support for async iterators to `fs.writeFile`.
- version: v16.0.0
pr-url: https://github.com/nodejs/node/pull/37460
description: The error returned may be an `AggregateError` if more than one
Expand Down Expand Up @@ -3916,7 +3919,8 @@ changes:
-->
* `file` {string|Buffer|URL|integer} filename or file descriptor
* `data` {string|Buffer|TypedArray|DataView|Object}
* `data` {string|Buffer|TypedArray|DataView|Object
|AsyncIterable|Iterable|Stream}
* `options` {Object|string}
* `encoding` {string|null} **Default:** `'utf8'`
* `mode` {integer} **Default:** `0o666`
Expand Down
72 changes: 57 additions & 15 deletions lib/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const {
StringPrototypeCharCodeAt,
StringPrototypeIndexOf,
StringPrototypeSlice,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -85,6 +87,7 @@ const {
const { FSReqCallback } = binding;
const { toPathIfFileURL } = require('internal/url');
const internalUtil = require('internal/util');
const { isCustomIterable } = require('internal/streams/utils');
const {
constants: {
kIoMaxLength,
Expand Down Expand Up @@ -828,12 +831,12 @@ function write(fd, buffer, offset, length, position, callback) {
} else {
position = length;
}
length = 'utf8';
length = length || 'utf8';
}

const str = String(buffer);
validateEncoding(str, length);
callback = maybeCallback(position);
callback = maybeCallback(callback || position);

const req = new FSReqCallback();
req.oncomplete = wrapper;
Expand Down Expand Up @@ -2039,7 +2042,8 @@ function lutimesSync(path, atime, mtime) {
handleErrorFromBinding(ctx);
}

function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
function writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
if (signal?.aborted) {
const abortError = new AbortError();
if (isUserFd) {
Expand All @@ -2051,16 +2055,16 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
}
return;
}
// write(fd, buffer, offset, length, position, callback)

if (isCustomIterable(buffer)) {
writeAllCustomIterable(
fd, isUserFd, buffer, offset, length, signal, encoding, callback)
.catch((reason) => { throw reason; });
return;
}
fs.write(fd, buffer, offset, length, null, (writeErr, written) => {
if (writeErr) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
} else if (written === length) {
if (isUserFd) {
callback(null);
Expand All @@ -2070,11 +2074,43 @@ function writeAll(fd, isUserFd, buffer, offset, length, signal, callback) {
} else {
offset += written;
length -= written;
writeAll(fd, isUserFd, buffer, offset, length, signal, callback);
writeAll(
fd, isUserFd, buffer, offset, length, signal, encoding, callback);
}
});
}

async function writeAllCustomIterable(
fd, isUserFd, buffer, offset, length, signal, encoding, callback) {
const result = await buffer.next();
if (result.done) {
fs.close(fd, callback);
return;
}
const resultValue = result.value.toString();
fs.write(fd, resultValue, undefined,
isArrayBufferView(buffer) ? resultValue.byteLength : encoding,
null, (writeErr, _) => {
if (writeErr) {
handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback);
} else {
writeAll(fd, isUserFd, buffer, offset,
length, signal, encoding, callback);
}
}
);
}

function handleWriteAllErrorCallback(fd, isUserFd, writeErr, callback) {
if (isUserFd) {
callback(writeErr);
} else {
fs.close(fd, (err) => {
callback(aggregateTwoErrors(err, writeErr));
});
}
}

/**
* Asynchronously writes data to the file.
* @param {string | Buffer | URL | number} path
Expand All @@ -2093,15 +2129,20 @@ function writeFile(path, data, options, callback) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'w' });
const flag = options.flag || 'w';

if (!isArrayBufferView(data)) {
if (!isArrayBufferView(data) && !isCustomIterable(data)) {
validateStringAfterArrayBufferView(data, 'data');
data = Buffer.from(String(data), options.encoding || 'utf8');
}

if (isCustomIterable(data)) {
data = data[SymbolIterator]?.() ?? data[SymbolAsyncIterator]?.();
}

if (isFd(path)) {
const isUserFd = true;
const signal = options.signal;
writeAll(path, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(path, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
return;
}

Expand All @@ -2114,7 +2155,8 @@ function writeFile(path, data, options, callback) {
} else {
const isUserFd = false;
const signal = options.signal;
writeAll(fd, isUserFd, data, 0, data.byteLength, signal, callback);
writeAll(fd, isUserFd, data,
0, data.byteLength, signal, options.encoding, callback);
}
});
}
Expand Down
6 changes: 1 addition & 5 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const pathModule = require('path');
const { promisify } = require('internal/util');
const { EventEmitterMixin } = require('internal/event_target');
const { watch } = require('internal/fs/watchers');
const { isIterable } = require('internal/streams/utils');
const { isCustomIterable } = require('internal/streams/utils');
const assert = require('internal/assert');

const kHandle = Symbol('kHandle');
Expand Down Expand Up @@ -730,10 +730,6 @@ async function writeFile(path, data, options) {
writeFileHandle(fd, data, options.signal, options.encoding), fd.close);
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

async function appendFile(path, data, options) {
options = getOptions(options, { encoding: 'utf8', mode: 0o666, flag: 'a' });
options = copyObject(options);
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/streams/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const {
SymbolAsyncIterator,
SymbolIterator,
} = primordials;
const { isArrayBufferView } = require('internal/util/types');

function isReadable(obj) {
return !!(obj && typeof obj.pipe === 'function' &&
Expand All @@ -27,7 +28,12 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}

function isCustomIterable(obj) {
return isIterable(obj) && !isArrayBufferView(obj) && typeof obj !== 'string';
}

module.exports = {
isCustomIterable,
isIterable,
isReadable,
isStream,
Expand Down
88 changes: 88 additions & 0 deletions test/parallel/test-fs-write-file-async-iterators.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fs = require('fs');
const join = require('path').join;
const { Readable } = require('stream');

const tmpdir = require('../common/tmpdir');
tmpdir.refresh();

{
const filenameIterable = join(tmpdir.path, 'testIterable.txt');
const iterable = {
expected: 'abc',
*[Symbol.iterator]() {
yield 'a';
yield 'b';
yield 'c';
}
};

fs.writeFile(filenameIterable, iterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameIterable, 'utf-8');
assert.strictEqual(iterable.expected, data);
}));
}

{
const filenameBufferIterable = join(tmpdir.path, 'testBufferIterable.txt');
const bufferIterable = {
expected: 'abc',
*[Symbol.iterator]() {
yield Buffer.from('a');
yield Buffer.from('b');
yield Buffer.from('c');
}
};

fs.writeFile(
filenameBufferIterable, bufferIterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameBufferIterable, 'utf-8');
assert.strictEqual(bufferIterable.expected, data);
})
);
}


{
const filenameAsyncIterable = join(tmpdir.path, 'testAsyncIterable.txt');
const asyncIterable = {
expected: 'abc',
*[Symbol.asyncIterator]() {
yield 'a';
yield 'b';
yield 'c';
}
};

fs.writeFile(filenameAsyncIterable, asyncIterable, common.mustSucceed(() => {
const data = fs.readFileSync(filenameAsyncIterable, 'utf-8');
assert.strictEqual(asyncIterable.expected, data);
}));
}

{
const filenameStream = join(tmpdir.path, 'testStream.txt');
const stream = Readable.from(['a', 'b', 'c']);
const expected = 'abc';

fs.writeFile(filenameStream, stream, common.mustSucceed(() => {
const data = fs.readFileSync(filenameStream, 'utf-8');
assert.strictEqual(expected, data);
}));
}

{
const filenameStreamWithEncoding =
join(tmpdir.path, 'testStreamWithEncoding.txt');
const stream = Readable.from(['ümlaut', ' ', 'sechzig']);
const expected = 'ümlaut sechzig';

fs.writeFile(
filenameStreamWithEncoding, stream, 'latin1', common.mustSucceed(() => {
const data = fs.readFileSync(filenameStreamWithEncoding, 'latin1');
assert.strictEqual(expected, data);
})
);
}

0 comments on commit 785d76d

Please sign in to comment.