Skip to content

Commit

Permalink
child_process,cluster: allow using V8 serialization API
Browse files Browse the repository at this point in the history
Add an `serialization` option that allows child process IPC to
use the (typically more powerful) V8 serialization API.

Fixes: #10965

PR-URL: #30162
Reviewed-By: Colin Ihrig <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
Reviewed-By: David Carlier <[email protected]>
Reviewed-By: Michaël Zasso <[email protected]>
  • Loading branch information
addaleax authored and BethGriggs committed Feb 6, 2020
1 parent b87ae6d commit df1e183
Show file tree
Hide file tree
Showing 13 changed files with 304 additions and 39 deletions.
11 changes: 10 additions & 1 deletion benchmark/cluster/echo.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@ if (cluster.isMaster) {
workers: [1],
payload: ['string', 'object'],
sendsPerBroadcast: [1, 10],
serialization: ['json', 'advanced'],
n: [1e5]
});

function main({ n, workers, sendsPerBroadcast, payload }) {
function main({
n,
workers,
sendsPerBroadcast,
payload,
serialization
}) {
const expectedPerBroadcast = sendsPerBroadcast * workers;
var readies = 0;
var broadcasts = 0;
var msgCount = 0;
var data;

cluster.settings.serialization = serialization;

switch (payload) {
case 'string':
data = 'hello world!';
Expand Down
39 changes: 39 additions & 0 deletions doc/api/child_process.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ arbitrary command execution.**
<!-- YAML
added: v0.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/10866
description: The `stdio` option can now be a string.
Expand All @@ -340,6 +343,9 @@ changes:
* `execPath` {string} Executable used to create the child process.
* `execArgv` {string[]} List of string arguments passed to the executable.
**Default:** `process.execArgv`.
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be
piped to the parent, otherwise they will be inherited from the parent, see
the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s
Expand Down Expand Up @@ -386,6 +392,9 @@ The `shell` option available in [`child_process.spawn()`][] is not supported by
<!-- YAML
added: v0.1.90
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v8.8.0
pr-url: https://github.com/nodejs/node/pull/15380
description: The `windowsHide` option is supported now.
Expand All @@ -411,6 +420,9 @@ changes:
[`options.detached`][]).
* `uid` {number} Sets the user identity of the process (see setuid(2)).
* `gid` {number} Sets the group identity of the process (see setgid(2)).
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses
`'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different
shell can be specified as a string. See [Shell Requirements][] and
Expand Down Expand Up @@ -998,6 +1010,11 @@ The `'message'` event is triggered when a child process uses
The message goes through serialization and parsing. The resulting
message might not be the same as what is originally sent.

If the `serialization` option was set to `'advanced'` used when spawning the
child process, the `message` argument can contain data that JSON is not able
to represent.
See [Advanced Serialization][] for more details.

### `subprocess.channel`
<!-- YAML
added: v7.1.0
Expand Down Expand Up @@ -1474,6 +1491,26 @@ the same requirement. Thus, in `child_process` functions where a shell can be
spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is
unavailable.

## Advanced Serialization
<!-- YAML
added: REPLACEME
-->

Child processes support a serialization mechanism for IPC that is based on the
[serialization API of the `v8` module][v8.serdes], based on the
[HTML structured clone algorithm][]. This is generally more powerful and
supports more built-in JavaScript object types, such as `BigInt`, `Map`
and `Set`, `ArrayBuffer` and `TypedArray`, `Buffer`, `Error`, `RegExp` etc.

However, this format is not a full superset of JSON, and e.g. properties set on
objects of such built-in types will not be passed on through the serialization
step. Additionally, performance may not be equivalent to that of JSON, depending
on the structure of the passed data.
Therefore, this feature requires opting in by setting the
`serialization` option to `'advanced'` when calling [`child_process.spawn()`][]
or [`child_process.fork()`][].

[Advanced Serialization]: #child_process_advanced_serialization
[`'disconnect'`]: process.html#process_event_disconnect
[`'error'`]: #child_process_event_error
[`'exit'`]: #child_process_event_exit
Expand Down Expand Up @@ -1507,5 +1544,7 @@ unavailable.
[`subprocess.stdout`]: #child_process_subprocess_stdout
[`util.promisify()`]: util.html#util_util_promisify_original
[Default Windows Shell]: #child_process_default_windows_shell
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
[Shell Requirements]: #child_process_shell_requirements
[synchronous counterparts]: #child_process_synchronous_process_creation
[v8.serdes]: v8.html#v8_serialization_api
8 changes: 8 additions & 0 deletions doc/api/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,9 @@ values are `'rr'` and `'none'`.
<!-- YAML
added: v0.7.1
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30162
description: The `serialization` option is supported now.
- version: v9.5.0
pr-url: https://github.com/nodejs/node/pull/18399
description: The `cwd` option is supported now.
Expand All @@ -746,6 +749,10 @@ changes:
**Default:** `process.argv.slice(2)`.
* `cwd` {string} Current working directory of the worker process. **Default:**
`undefined` (inherits from parent process).
* `serialization` {string} Specify the kind of serialization used for sending
messages between processes. Possible values are `'json'` and `'advanced'`.
See [Advanced Serialization for `child_process`][] for more details.
**Default:** `false`.
* `silent` {boolean} Whether or not to send output to parent's stdio.
**Default:** `false`.
* `stdio` {Array} Configures the stdio of forked processes. Because the
Expand Down Expand Up @@ -874,4 +881,5 @@ socket.on('data', (id) => {
[`process` event: `'message'`]: process.html#process_event_message
[`server.close()`]: net.html#net_event_close
[`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options
6 changes: 6 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ the child process.
The message goes through serialization and parsing. The resulting message might
not be the same as what is originally sent.

If the `serialization` option was set to `advanced` used when spawning the
process, the `message` argument can contain data that JSON is not able
to represent.
See [Advanced Serialization for `child_process`][] for more details.

### Event: `'multipleResolves'`
<!-- YAML
added: v10.12.0
Expand Down Expand Up @@ -2457,6 +2462,7 @@ cases:
[`require.resolve()`]: modules.html#modules_require_resolve_request_options
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags
[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os
[Child Process]: child_process.html
[Cluster]: cluster.html
Expand Down
7 changes: 4 additions & 3 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ function fork(modulePath /* , args, options */) {
return spawn(options.execPath, args, options);
}

function _forkChild(fd) {
function _forkChild(fd, serializationMode) {
// set process.send()
const p = new Pipe(PipeConstants.IPC);
p.open(fd);
p.unref();
const control = setupChannel(process, p);
const control = setupChannel(process, p, serializationMode);
process.on('newListener', function onNewListener(name) {
if (name === 'message' || name === 'disconnect') control.ref();
});
Expand Down Expand Up @@ -547,7 +547,8 @@ function spawn(file, args, options) {
envPairs: opts.envPairs,
stdio: options.stdio,
uid: options.uid,
gid: options.gid
gid: options.gid,
serialization: options.serialization,
});

return child;
Expand Down
6 changes: 5 additions & 1 deletion lib/internal/bootstrap/pre_execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() {
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_CHANNEL_FD;

require('child_process')._forkChild(fd);
const serializationMode =
process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;

require('child_process')._forkChild(fd, serializationMode);
assert(process.send);
}
}
Expand Down
59 changes: 27 additions & 32 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { JSON, Object } = primordials;
const { Object } = primordials;

const {
errnoException,
Expand Down Expand Up @@ -55,8 +55,6 @@ const {

const { SocketListSend, SocketListReceive } = SocketList;

// Lazy loaded for startup performance.
let StringDecoder;
// Lazy loaded for startup performance and to allow monkey patching of
// internalBinding('http_parser').HTTPParser.
let freeParser;
Expand Down Expand Up @@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) {
const ipcFd = stdio.ipcFd;
stdio = options.stdio = stdio.stdio;

if (options.serialization !== undefined &&
options.serialization !== 'json' &&
options.serialization !== 'advanced') {
throw new ERR_INVALID_OPT_VALUE('options.serialization',
options.serialization);
}

const serialization = options.serialization || 'json';

if (ipc !== undefined) {
// Let child process know about opened IPC channel
if (options.envPairs === undefined)
Expand All @@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) {
options.envPairs);
}

options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
}

validateString(options.file, 'options.file');
Expand Down Expand Up @@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) {
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);

// Add .send() method and start listening for IPC data
if (ipc !== undefined) setupChannel(this, ipc);
if (ipc !== undefined) setupChannel(this, ipc, serialization);

return err;
};
Expand Down Expand Up @@ -513,7 +521,8 @@ class Control extends EventEmitter {
}
}

function setupChannel(target, channel) {
let serialization;
function setupChannel(target, channel, serializationMode) {
target.channel = channel;

// _channel can be deprecated in version 8
Expand All @@ -528,12 +537,16 @@ function setupChannel(target, channel) {

const control = new Control(channel);

if (StringDecoder === undefined)
StringDecoder = require('string_decoder').StringDecoder;
const decoder = new StringDecoder('utf8');
var jsonBuffer = '';
var pendingHandle = null;
channel.buffering = false;
if (serialization === undefined)
serialization = require('internal/child_process/serialization');
const {
initMessageChannel,
parseChannelMessages,
writeChannelMessage
} = serialization[serializationMode];

let pendingHandle = null;
initMessageChannel(channel);
channel.pendingHandle = null;
channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle;
Expand All @@ -545,21 +558,7 @@ function setupChannel(target, channel) {
if (recvHandle)
pendingHandle = recvHandle;

// Linebreak is used as a message end sign
var chunks = decoder.write(pool).split('\n');
var numCompleteChunks = chunks.length - 1;
// Last line does not have trailing linebreak
var incompleteChunk = chunks[numCompleteChunks];
if (numCompleteChunks === 0) {
jsonBuffer += incompleteChunk;
this.buffering = jsonBuffer.length !== 0;
return;
}
chunks[0] = jsonBuffer + chunks[0];

for (var i = 0; i < numCompleteChunks; i++) {
var message = JSON.parse(chunks[i]);

for (const message of parseChannelMessages(channel, pool)) {
// There will be at most one NODE_HANDLE message in every chunk we
// read because SCM_RIGHTS messages don't get coalesced. Make sure
// that we deliver the handle with the right message however.
Expand All @@ -574,9 +573,6 @@ function setupChannel(target, channel) {
handleMessage(message, undefined, false);
}
}
jsonBuffer = incompleteChunk;
this.buffering = jsonBuffer.length !== 0;

} else {
this.buffering = false;
target.disconnect();
Expand Down Expand Up @@ -775,8 +771,7 @@ function setupChannel(target, channel) {

const req = new WriteWrap();

const string = JSON.stringify(message) + '\n';
const err = channel.writeUtf8String(req, string, handle);
const err = writeChannelMessage(channel, req, message, handle);
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];

if (err === 0) {
Expand Down
Loading

0 comments on commit df1e183

Please sign in to comment.