Skip to content

Commit

Permalink
child_process: speed up 'advanced' ipc receiving
Browse files Browse the repository at this point in the history
PR-URL: #42931
Reviewed-By: Ruben Bridgewater <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
Reviewed-By: Zeyu "Alex" Yang <[email protected]>
  • Loading branch information
CaramelFur authored and targos committed Jul 18, 2022
1 parent dfa8bbb commit f39bc5f
Showing 1 changed file with 33 additions and 11 deletions.
44 changes: 33 additions & 11 deletions lib/internal/child_process/serialization.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const {
JSONParse,
JSONStringify,
StringPrototypeSplit,
ArrayPrototypePush,
ReflectApply,
Symbol,
TypedArrayPrototypeSubarray,
} = primordials;
Expand All @@ -13,8 +15,10 @@ const v8 = require('v8');
const { isArrayBufferView } = require('internal/util/types');
const assert = require('internal/assert');
const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap');
const { readUInt32BE } = require('internal/buffer');

const kMessageBuffer = Symbol('kMessageBuffer');
const kMessageBufferSize = Symbol('kMessageBufferSize');
const kJSONBuffer = Symbol('kJSONBuffer');
const kStringDecoder = Symbol('kStringDecoder');

Expand Down Expand Up @@ -51,29 +55,47 @@ class ChildProcessDeserializer extends v8.DefaultDeserializer {
// (aka 'advanced')
const advanced = {
initMessageChannel(channel) {
channel[kMessageBuffer] = Buffer.alloc(0);
channel[kMessageBuffer] = [];
channel[kMessageBufferSize] = 0;
channel.buffering = false;
},

*parseChannelMessages(channel, readData) {
if (readData.length === 0) return;

let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
while (messageBuffer.length > 4) {
const size = messageBuffer.readUInt32BE();
if (messageBuffer.length < 4 + size) {
break;
}
ArrayPrototypePush(channel[kMessageBuffer], readData);
channel[kMessageBufferSize] += readData.length;

// Index 0 should always be present because we just pushed data into it.
let messageBufferHead = channel[kMessageBuffer][0];
while (messageBufferHead.length >= 4) {
// We call `readUInt32BE` manually here, because this is faster than first converting
// it to a buffer and using `readUInt32BE` on that.
const fullMessageSize = ReflectApply(readUInt32BE, messageBufferHead, [0]) + 4;

if (channel[kMessageBufferSize] < fullMessageSize) break;

const concatenatedBuffer = channel[kMessageBuffer].length === 1 ?
channel[kMessageBuffer][0] :
Buffer.concat(
channel[kMessageBuffer],
channel[kMessageBufferSize]
);

const deserializer = new ChildProcessDeserializer(
TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size));
messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size);
TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize)
);

messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize);
channel[kMessageBufferSize] = messageBufferHead.length;
channel[kMessageBuffer] =
channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : [];

deserializer.readHeader();
yield deserializer.readValue();
}
channel[kMessageBuffer] = messageBuffer;
channel.buffering = messageBuffer.length > 0;

channel.buffering = channel[kMessageBufferSize] > 0;
},

writeChannelMessage(channel, req, message, handle) {
Expand Down

0 comments on commit f39bc5f

Please sign in to comment.