From cfd9ff9c02c035bf7c014fa330dd781fc2a9fd78 Mon Sep 17 00:00:00 2001 From: rubikscraft Date: Sat, 30 Apr 2022 23:00:33 +0200 Subject: [PATCH 1/5] child_process: speed up 'advanced' ipc receiving --- lib/internal/child_process/serialization.js | 45 ++++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index ec858f401bea9e..5a5c79384f5fa7 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -15,6 +15,7 @@ const assert = require('internal/assert'); const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); const kMessageBuffer = Symbol('kMessageBuffer'); +const kMessageBufferSize = Symbol('kMessageBufferSize'); const kJSONBuffer = Symbol('kJSONBuffer'); const kStringDecoder = Symbol('kStringDecoder'); @@ -51,29 +52,51 @@ class ChildProcessDeserializer extends v8.DefaultDeserializer { // (aka 'advanced') const advanced = { initMessageChannel(channel) { - channel[kMessageBuffer] = Buffer.alloc(0); + channel[kMessageBuffer] = []; + channel[kMessageBufferSize] = 0; channel.buffering = false; }, *parseChannelMessages(channel, readData) { if (readData.length === 0) return; - let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]); - while (messageBuffer.length > 4) { - const size = messageBuffer.readUInt32BE(); - if (messageBuffer.length < 4 + size) { - break; - } + channel[kMessageBuffer].push(readData); + channel[kMessageBufferSize] += readData.length; + + // Index 0 should always be present because we just pushed data into it. + let messageBufferHead = channel[kMessageBuffer][0]; + while (messageBufferHead.length >= 4) { + // We read the uint manually here, because this is faster than first converting + // it to a buffer and using `readUInt32BE` on that. + const size = + messageBufferHead[0] << 24 | + messageBufferHead[1] << 16 | + messageBufferHead[2] << 8 | + messageBufferHead[3]; + + if (channel[kMessageBufferSize] < 4 + size) break; + + const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? + channel[kMessageBuffer][0] : + Buffer.concat( + channel[kMessageBuffer], + channel[kMessageBufferSize] + ); const deserializer = new ChildProcessDeserializer( - TypedArrayPrototypeSubarray(messageBuffer, 4, 4 + size)); - messageBuffer = TypedArrayPrototypeSubarray(messageBuffer, 4 + size); + TypedArrayPrototypeSubarray(concatenatedBuffer, 4, 4 + size) + ); + + messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, 4 + size); + channel[kMessageBufferSize] = messageBufferHead.length; + channel[kMessageBuffer] = + channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; deserializer.readHeader(); yield deserializer.readValue(); } - channel[kMessageBuffer] = messageBuffer; - channel.buffering = messageBuffer.length > 0; + + channel.buffering = channel[kMessageBufferSize] > 0; }, writeChannelMessage(channel, req, message, handle) { From ff29aeca7caa8d129402be9faeb80938e2f4e537 Mon Sep 17 00:00:00 2001 From: rubikscraft Date: Sun, 1 May 2022 16:28:58 +0200 Subject: [PATCH 2/5] child_process: add the suggestion from @BridgeAR --- lib/internal/child_process/serialization.js | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index 5a5c79384f5fa7..26facbcd47ae1c 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -66,15 +66,11 @@ const advanced = { // Index 0 should always be present because we just pushed data into it. let messageBufferHead = channel[kMessageBuffer][0]; while (messageBufferHead.length >= 4) { - // We read the uint manually here, because this is faster than first converting + // We call `readUInt32BE` manually here, because this is faster than first converting // it to a buffer and using `readUInt32BE` on that. - const size = - messageBufferHead[0] << 24 | - messageBufferHead[1] << 16 | - messageBufferHead[2] << 8 | - messageBufferHead[3]; + const fullMessageSize = Buffer.prototype.readUInt32BE.call(messageBufferHead, 0) + 4; - if (channel[kMessageBufferSize] < 4 + size) break; + if (channel[kMessageBufferSize] < fullMessageSize) break; const concatenatedBuffer = channel[kMessageBuffer].length === 1 ? channel[kMessageBuffer][0] : @@ -84,10 +80,10 @@ const advanced = { ); const deserializer = new ChildProcessDeserializer( - TypedArrayPrototypeSubarray(concatenatedBuffer, 4, 4 + size) + TypedArrayPrototypeSubarray(concatenatedBuffer, 4, fullMessageSize) ); - messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, 4 + size); + messageBufferHead = TypedArrayPrototypeSubarray(concatenatedBuffer, fullMessageSize); channel[kMessageBufferSize] = messageBufferHead.length; channel[kMessageBuffer] = channel[kMessageBufferSize] !== 0 ? [messageBufferHead] : []; From abd50a083fb939e13f2f208080520f29f2f585ed Mon Sep 17 00:00:00 2001 From: rubikscraft Date: Mon, 2 May 2022 11:38:53 +0200 Subject: [PATCH 3/5] child_process: use primordials Suggested by @Himself65 and @anonrig --- lib/internal/child_process/serialization.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index 26facbcd47ae1c..f8b919ba300179 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -4,6 +4,8 @@ const { JSONParse, JSONStringify, StringPrototypeSplit, + ArrayPrototypePush, + ReflectApply, Symbol, TypedArrayPrototypeSubarray, } = primordials; @@ -13,6 +15,7 @@ const v8 = require('v8'); const { isArrayBufferView } = require('internal/util/types'); const assert = require('internal/assert'); const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); +const { readUInt32BE } = require('internal/buffer'); const kMessageBuffer = Symbol('kMessageBuffer'); const kMessageBufferSize = Symbol('kMessageBufferSize'); @@ -60,7 +63,7 @@ const advanced = { *parseChannelMessages(channel, readData) { if (readData.length === 0) return; - channel[kMessageBuffer].push(readData); + ArrayPrototypePush(channel[kMessageBuffer], readData); channel[kMessageBufferSize] += readData.length; // Index 0 should always be present because we just pushed data into it. @@ -68,7 +71,7 @@ const advanced = { while (messageBufferHead.length >= 4) { // We call `readUInt32BE` manually here, because this is faster than first converting // it to a buffer and using `readUInt32BE` on that. - const fullMessageSize = Buffer.prototype.readUInt32BE.call(messageBufferHead, 0) + 4; + const fullMessageSize = ReflectApply(readUInt32BE, messageBufferHead, [0]) + 4; if (channel[kMessageBufferSize] < fullMessageSize) break; From e36588b0328156f03783ac51eba05e6c3cb62b43 Mon Sep 17 00:00:00 2001 From: rubikscraft Date: Wed, 15 Jun 2022 17:41:43 +0200 Subject: [PATCH 4/5] child_process: improve ipc write performance --- lib/internal/child_process/serialization.js | 25 +++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index f8b919ba300179..433a55a35fdc2f 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -100,20 +100,27 @@ const advanced = { writeChannelMessage(channel, req, message, handle) { const ser = new ChildProcessSerializer(); + // Add 4 bytes, to later populate with message length + ser.writeRawBytes(Buffer.allocUnsafe(4)); ser.writeHeader(); ser.writeValue(message); + const serializedMessage = ser.releaseBuffer(); - const sizeBuffer = Buffer.allocUnsafe(4); - sizeBuffer.writeUInt32BE(serializedMessage.length); - - const buffer = Buffer.concat([ - sizeBuffer, - serializedMessage, - ]); - const result = channel.writeBuffer(req, buffer, handle); + const serializedMessageLength = serializedMessage.length - 4; + + serializedMessage.set([ + serializedMessageLength >> 24 & 0xFF, + serializedMessageLength >> 16 & 0xFF, + serializedMessageLength >> 8 & 0xFF, + serializedMessageLength & 0xFF, + ], 0); + + const result = channel.writeBuffer(req, serializedMessage, handle); + // Mirror what stream_base_commons.js does for Buffer retention. if (streamBaseState[kLastWriteWasAsync]) - req.buffer = buffer; + req.buffer = serializedMessage; + return result; }, }; From 4db1292429b91f629898021d42352bc1adcc4c9b Mon Sep 17 00:00:00 2001 From: rubikscraft Date: Wed, 15 Jun 2022 16:06:12 +0200 Subject: [PATCH 5/5] child_process: add suggestion from @bnoordhuis --- lib/internal/child_process/serialization.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js index 433a55a35fdc2f..497bf233d77897 100644 --- a/lib/internal/child_process/serialization.js +++ b/lib/internal/child_process/serialization.js @@ -5,7 +5,6 @@ const { JSONStringify, StringPrototypeSplit, ArrayPrototypePush, - ReflectApply, Symbol, TypedArrayPrototypeSubarray, } = primordials; @@ -15,7 +14,6 @@ const v8 = require('v8'); const { isArrayBufferView } = require('internal/util/types'); const assert = require('internal/assert'); const { streamBaseState, kLastWriteWasAsync } = internalBinding('stream_wrap'); -const { readUInt32BE } = require('internal/buffer'); const kMessageBuffer = Symbol('kMessageBuffer'); const kMessageBufferSize = Symbol('kMessageBufferSize'); @@ -71,7 +69,12 @@ const advanced = { while (messageBufferHead.length >= 4) { // We call `readUInt32BE` manually here, because this is faster than first converting // it to a buffer and using `readUInt32BE` on that. - const fullMessageSize = ReflectApply(readUInt32BE, messageBufferHead, [0]) + 4; + const fullMessageSize = ( + messageBufferHead[0] << 24 | + messageBufferHead[1] << 16 | + messageBufferHead[2] << 8 | + messageBufferHead[3] + ) + 4; if (channel[kMessageBufferSize] < fullMessageSize) break;